Re: Write Spark Connection client application in Go

2023-09-13 Thread Martin Grund
This is absolutely awesome! Thank you so much for dedicating your time to
this project!


On Wed, Sep 13, 2023 at 6:04 AM Holden Karau  wrote:

> That’s so cool! Great work y’all :)
>
> On Tue, Sep 12, 2023 at 8:14 PM bo yang  wrote:
>
>> Hi Spark Friends,
>>
>> Anyone interested in using Golang to write Spark application? We created
>> a Spark Connect Go Client library
>> <https://github.com/apache/spark-connect-go>. Would love to hear
>> feedback/thoughts from the community.
>>
>> Please see the quick start guide
>> <https://github.com/apache/spark-connect-go/blob/master/quick-start.md>
>> about how to use it. Following is a very short Spark Connect application in
>> Go:
>>
>> func main() {
>>  spark, _ := 
>> sql.SparkSession.Builder.Remote("sc://localhost:15002").Build()
>>  defer spark.Stop()
>>
>>  df, _ := spark.Sql("select 'apple' as word, 123 as count union all 
>> select 'orange' as word, 456 as count")
>>  df.Show(100, false)
>>  df.Collect()
>>
>>  df.Write().Mode("overwrite").
>>  Format("parquet").
>>  Save("file:///tmp/spark-connect-write-example-output.parquet")
>>
>>  df = spark.Read().Format("parquet").
>>      Load("file:///tmp/spark-connect-write-example-output.parquet")
>>  df.Show(100, false)
>>
>>  df.CreateTempView("view1", true, false)
>>  df, _ = spark.Sql("select count, word from view1 order by count")
>> }
>>
>>
>> Many thanks to Martin, Hyukjin, Ruifeng and Denny for creating and
>> working together on this repo! Welcome more people to contribute :)
>>
>> Best,
>> Bo
>>
>>


[Feature Request] make unix_micros() and unix_millis() available in PySpark (pyspark.sql.functions)

2022-10-14 Thread Martin
Hi everyone,

In *Spark SQL* there are several timestamp related functions

   - unix_micros(timestamp)
   Returns the number of microseconds since 1970-01-01 00:00:00 UTC.
   - unix_millis(timestamp)
   Returns the number of milliseconds since 1970-01-01 00:00:00 UTC.
   Truncates higher levels of precision.

See https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html

Currently these are *"missing" in pyspark.sql.functions*.
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#datetime-functions

I'd appreciate it if these were also available in PySpark.

Cheers,
Martin


Re: Moving to Spark 3x from Spark2

2022-09-01 Thread Martin Andersson
You should check the release notes and upgrade instructions.

From: rajat kumar 
Sent: Thursday, September 1, 2022 12:44
To: user @spark 
Subject: Moving to Spark 3x from Spark2


EXTERNAL SENDER. Do not click links or open attachments unless you recognize 
the sender and know the content is safe. DO NOT provide your username or 
password.


Hello Members,

We want to move to Spark 3 from Spark2.4 .

Are there any changes we need to do at code level which can break the existing 
code?

Will it work by simply changing the version of spark & scala ?

Regards
Rajat


Question regarding checkpointing with kafka structured streaming

2022-08-22 Thread Martin Andersson
I was looking around for some documentation regarding how checkpointing (or 
rather, delivery semantics) is done when consuming from kafka with structured 
streaming and I stumbled across this old documentation (that still somehow 
exists in latest versions) at 
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#checkpoints.

This page (which I assume is from around the time of Spark 2.4?) describes that 
storing offsets using checkpoiting is the least reliable method and goes 
further into how to use kafka or an external storage to commit offsets.

It also says
If you enable Spark checkpointing, offsets will be stored in the checkpoint. 
(...) Furthermore, you cannot recover from a checkpoint if your application 
code has changed.

This all leaves me with several questions:

  1.  Is the above quote still true for Spark 3, that the checkpoint will break 
if you change the code? (I know it does if you change the subscribe pattern)

  2.  Why was the option to manually commit offsets asynchronous to kafka 
removed when it was deemed more reliable than checkpointing? Not to mention 
that storing offsets in kafka allows you to use all the tools offered with 
kafka to easily reset/rewind offsets on specific topics, which doesn't seem to 
be possible when using checkpoints.

  3.  From a user perspective, storing offsets in kafka offers more features. 
From a developer perspective, having to re-implement offset storage across 
several output systems (such as HDFS, AWS S3 and other object storages) seems 
like a lot of unnecessary work and re-inventing the wheel.
Is the discussion leading up to the decision to only support storing offsets 
with checkpointing documented anywhere, perhaps in a jira?

Thanks for your time


unsubscribe

2022-08-01 Thread Martin Soch




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Issues getting Apache Spark

2022-05-26 Thread Martin, Michael
Hello,

I'm writing to request assistance in getting Apache Spark on my laptop. I've 
followed instructions telling me to get Java, Python, Hadoop, Winutils, and 
Spark itself. I've followed instructions illustrating how to set my environment 
variables. For some reason, I still cannot get Spark to work on my laptop.

Michael Martin


Re: Spark on K8s - repeating annoying exception

2022-05-13 Thread Martin Grigorov
Hi,

On Mon, May 9, 2022 at 5:57 PM Shay Elbaz  wrote:

> Hi all,
>
>
>
> I apologize for reposting this from Stack Overflow, but it got very little
> attention and now comment.
>
>
>
> I'm using Spark 3.2.1 image that was built from the official distribution
> via `docker-image-tool.sh', on Kubernetes 1.18 cluster.
>
> Everything works fine, except for this error message on stdout every 90
> seconds:
>

Wild guess: K8S API polling ?!

https://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties

- spark.kubernetes.executor.apiPollingInterval
- spark.kubernetes.executor.missingPodDetectDelta

but for both settings the default is 30s, not 90s



>
>
> WARN WatcherWebSocketListener: Exec Failure
>
> java.io.EOFException
>
> at okio.RealBufferedSource.require(RealBufferedSource.java:61)
>
> at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
>
> at
> okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
>
> at
> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
>
> at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
>
> at
> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
>
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
>
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> This message does not effect the application, but it's really annoying,
> and especially for Jupyter users. The lack of details makes it very hard to
> debug.
>
> It appears on any submit variation - spark-submit, pyspark, spark-shell.
>
> I've found traces of it on the internet, but all occurrences were from
> older versions of Spark and were resolved by using "newer" version of
> fabric8 (4.x).
>
> Spark 3.2.1 already use fabric8 version 5.4.1.
>
> I wonder if anyone else still sees this error in Spark 3.x, and has a
> resolution.
>
>
>
> Thanks,
>
> Shay.
>


Idea for improving performance when reading from hive-like partition folders and specifying a filter [Spark 3.2]

2022-05-01 Thread Martin
Hello,

I think I noticed some Spark behavior that might have an enormous potential
for performance improvement when reading files from a folder with (many
nested) hive-style partitions and at the same time applying a filter to the
partition columns.
Situation

I have JSON files with log information stored in physical folders
partitioned by year, month, day and hour:

/logs
|-- year=2020
|-- year=2021
`-- year=2022
|-- month=01
`-- month=02
|-- day=01
|-- day=...
`-- day=13
|-- hour=
|-- hour=...
`-- hour=0900
|-- log01.json
|-- 
`-- log099133.json

I'd like to *only read specific JSON files* into a Dataframe (e.g. only
those *partitions* of the last 2 hours).
Background info

Spark generally supports *partition discovery* for hive-like folder
structures just like in my example:

*All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able
to discover and infer partitioning information automatically.*

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
.

So, when creating a dataframe by reading from the base path "/logs", the
dataframe will have the columns "year", "month", "day", "hour" - even when
these attributes are not contained in the JSON files directly. Spark
*derives* these columns from the folder names.
Problem

As stated above, I don't want to read *all *of those JSONs but only those
of the last two hours. Therefore I specify my dataframe as:

df1 = (spark
  .read
.format('json')
.schema(predefined_schema)
.option('basePath', '/logs')
.load('/logs')
  .filter('year=2022 AND month=04 AND day=19 AND hour>=1000 AND hour <=1200')
)
df1.explain(mode="formatted")

Executing this already takes a *couple of minutes* - even though no actual
data is read. Looking at the Spark logs, I found that Spark is *exploring
the whole file structure* under '/logs' - so every single folder content is
listed under '/logs'. I'm wondering why that is. Regarding the specified
filter, Spark could utilize this information already during folder
exploration and take a shortcut. It would only need to read the folders
under '/log/'*, then realize that all folders except '/log/year=2022' are
irrelevant; proceed and explorer the folders under '/log/year=2022/'* then
realize that all folders except '/log/year=2022/month=04' are irrelevant;
and so on...

I implemented a function that basically does the folder exploration up
front and assembles a list of relevant folder paths.

relevant_folders = [
'/log/year=2022/month=04/day=19/hour=1000'
  , '/log/year=2022/month=04/day=19/hour=1100'
  , '/log/year=2022/month=04/day=19/hour=1200'
]
df2 = (spark
  .read
.format('json')
.schema(predefined_schema)
.option('basePath', '/logs')
.load(relevant_folders)
)
df2.explain(mode="formatted")

Executing this only *takes seconds*.

I was wondering if this should not rather be some built-in functionality of
Spark.
One more thing

Don't get me wrong: both dataframes above lead to the more or less same
execution plan and have similar execution times for the actual read
operation. In the end in both cases Spark *only reads those physical JSON
files fitting the filter conditions*. However *specifying* the dataframe
df1 took way longer than df2.
Doing some math

For simplicity let's assume that we have data of three full years, and
every month has 30 days.

For df1 that's *8.4 million list operations* (root: 1, year: 3, month: 36,
day: 3,240, hour: 8,398,080)

For df2 that's 3 list operations; but I did 4 more list operations upfront
in oder do come up with the relevant_folders list; so *7 list operations* in
total (root: 1, year: 1, month: 1, day: 1, hour: 3)

Cheers

Martin


Re: Spark on K8s , some applications ended ungracefully

2022-04-01 Thread Martin Grigorov
Hi,

On Thu, Mar 31, 2022 at 4:18 PM Pralabh Kumar 
wrote:

> Hi Spark Team
>
> Some of my spark applications on K8s ended with the below error . These
> applications though completed successfully (as per the event log
> SparkListenerApplicationEnd event at the end)
> stil have even files with .inprogress. This causes the application to be
> shown as inprogress in SHS.
>
> Spark v : 3.0.1
>

I'd suggest you to try with newer version, e.g. 3.2.1 or even one built
from branch-3.3.



>
>
>
> 22/03/31 08:33:34 WARN ShutdownHookManager: ShutdownHook '$anon$2'
> timeout, java.util.concurrent.TimeoutException
>
> java.util.concurrent.TimeoutException
>
> at java.util.concurrent.FutureTask.get(FutureTask.java:205)
>
> at
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:68)
>
> 22/03/31 08:33:34 WARN SparkContext: Ignoring Exception while stopping
> SparkContext from shutdown hook
>
> java.lang.InterruptedException
>
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
>
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
>
> at
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
>
> at
> org.apache.spark.util.ThreadUtils$.shutdown(ThreadUtils.scala:348)
>
>
>
>
>
> Please let me know if there is a solution for it ..
>
> Regards
>
> Pralabh Kumar
>


Re: spark distribution build fails

2022-03-17 Thread Martin Grigorov
Hi,

For the mail archives: this error happens when the user has MAVEN_OPTS env
var pre-exported. In this case ./build/mvn|sbt does not export its own
MAVEN_OPTS with the -XssXYZ value, and the default one is too low and leads
to the StackOverflowError

On Mon, Mar 14, 2022 at 11:13 PM Bulldog20630405 
wrote:

>
> thanx; that worked great!
>
> On Mon, Mar 14, 2022 at 11:17 AM Sean Owen  wrote:
>
>> Try increasing the stack size in the build. It's the Xss argument you
>> find in various parts of the pom or sbt build. I have seen this and not
>> sure why it happens on certain envs, but that's the workaround
>>
>> On Mon, Mar 14, 2022, 8:59 AM Bulldog20630405 
>> wrote:
>>
>>>
>>> using tag v3.2.1 with java 8 getting a stackoverflow when building the
>>> distribution:
>>>
>>> > alias mvn
>>> alias mvn='mvn --errors --fail-at-end -DskipTests '
>>> > dev/make-distribution.sh --name 'hadoop-3.2' --pip --tgz -Phive
>>> -Phive-thriftserver -Pmesos -Pyarn -Pkubernetes
>>>
>>> [INFO]
>>> 
>>> [INFO] Reactor Summary for Spark Project Parent POM 3.2.1:
>>> [INFO]
>>> [INFO] Spark Project Parent POM ... SUCCESS [
>>>  2.978 s]
>>> [INFO] Spark Project Tags . SUCCESS [
>>>  6.585 s]
>>> [INFO] Spark Project Sketch ... SUCCESS [
>>>  6.684 s]
>>> [INFO] Spark Project Local DB . SUCCESS [
>>>  2.497 s]
>>> [INFO] Spark Project Networking ... SUCCESS [
>>>  6.312 s]
>>> [INFO] Spark Project Shuffle Streaming Service  SUCCESS [
>>>  3.925 s]
>>> [INFO] Spark Project Unsafe ... SUCCESS [
>>>  7.879 s]
>>> [INFO] Spark Project Launcher . SUCCESS [
>>>  2.238 s]
>>> [INFO] Spark Project Core . SUCCESS
>>> [02:33 min]
>>> [INFO] Spark Project ML Local Library . SUCCESS [
>>> 24.566 s]
>>> [INFO] Spark Project GraphX ... SUCCESS [
>>> 28.293 s]
>>> [INFO] Spark Project Streaming  SUCCESS [
>>> 51.070 s]
>>> [INFO] Spark Project Catalyst . FAILURE [
>>> 36.920 s]
>>> [INFO] Spark Project SQL .. SKIPPED
>>> [INFO] Spark Project ML Library ... SKIPPED
>>> [INFO] Spark Project Tools  SKIPPED
>>> 
>>>
>>> [INFO] Spark Avro . SKIPPED
>>> [INFO]
>>> 
>>> [INFO] BUILD FAILURE
>>> [INFO]
>>> 
>>> [INFO] Total time:  05:33 min
>>> [INFO] Finished at: 2022-03-14T13:45:15Z
>>> [INFO]
>>> 
>>> ---
>>> constituent[0]:
>>> file:/home/bulldog/software/maven/maven-3.8.4/conf/logging/
>>> constituent[1]:
>>> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-embedder-3.8.4.jar
>>> constituent[2]:
>>> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-settings-3.8.4.jar
>>> constituent[3]:
>>> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-settings-builder-3.8.4.jar
>>> constituent[4]:
>>> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-plugin-api-3.8.4.jar
>>> 
>>> ---
>>> Exception in thread "main" java.lang.StackOverflowError
>>> at
>>> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:49)
>>> at
>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
>>> at
>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
>>> ...
>>>
>>>
>>>
>>>
>>>
>>>


Re: Encoders.STRING() causing performance problems in Java application

2022-02-21 Thread martin



Thanks a lot, Sean, for the comments. I realize I didn't provide enough 
background information to properly diagnose this issue.


In the meantime, I have created some test cases for isolating the 
problem and running some specific performance tests. The numbers are 
quite revealing: Running our Spark model individually on Strings takes 
about 8 Sec for the test data, whereas is take 88 ms when run on the 
entire data in a single Dataset. This is a factor of 100x. This gets 
even worse for larger datasets.


So, the root cause here is the way the Spark model is being called for 
one string at a time by the self-built prediction pipeline (which is 
also using other ML techniques apart from Spark). Needs some 
re-factoring...


Thanks again for the help.

Cheers,

Martin

Am 2022-02-18 13:41, schrieb Sean Owen:

That doesn't make a lot of sense. Are you profiling the driver, rather 
than executors where the work occurs?

Is your data set quite small such that small overheads look big?
Do you even need Spark if your data is not distributed - coming from 
the driver anyway?


The fact that a static final field did anything suggests something is 
amiss with your driver program. Are you perhaps inadvertently 
serializing your containing class with a bunch of other data by using 
its methods in a closure?
If your data is small it's not surprising that the overhead could be in 
just copying the data around, the two methods you cite, rather than the 
compute.

Too many things here to really say what's going on.

On Fri, Feb 18, 2022 at 12:42 AM  wrote:


Hello,

I am working on optimising the performance of a Java ML/NLP 
application based on Spark / SparkNLP. For prediction, I am applying a 
trained model on a Spark dataset which consists of one column with 
only one row. The dataset is created like this:


List textList = Collections.singletonList(text);
Dataset data = sparkSession
.createDataset(textList, Encoders.STRING())
.withColumnRenamed(COL_VALUE, COL_TEXT);

The predictions are created like this:

PipelineModel fittedPipeline = pipeline.fit(dataset);

Dataset prediction = fittedPipeline.transform(dataset);

We noticed that the performance isn't quite as good as expected. After 
profiling the application with VisualVM, I noticed that the problem is 
with org.apache.spark.sql.Encoders.STRING() in the creation of the 
dataset, which by itself takes up about 75% of the time for the whole 
prediction method call.


So, is there a simpler and more efficient way of creating the required 
dataset, consisting of one column and one String row?


Thanks a lot.

Cheers,

Martin

Re: Encoders.STRING() causing performance problems in Java application

2022-02-18 Thread martin



Addendum: I have tried to replace localIterator with a forEach() call on 
the dataset directly, but this hasn't improved the performance.


If the forEach call is the issue, there probably isn't much that can be 
done to further improve things, other than perhaps trying to batch the 
prediction calls instead of running them line by line on the input file.


Cheers,

Martin

Am 2022-02-18 09:41, schrieb mar...@wunderlich.com:

I have been able to partially fix this issue by creating a static final 
field (i.e. a constant) for Encoders.STRING(). This removes the 
bottleneck associated with instantiating this Encoder. However, this 
moved the performance issue only to these two methods:


org.apache.spark.sql.SparkSession.createDataset (in the code below)

org.apache.spark.sql.Dataset.toLocalIterator ()

(ca. 40% each of execution time)

The second one is called when extracting the prediction results from 
the dataset:


Dataset datasetWithPredictions = predictor.predict(text);

Dataset tokensWithPredictions = 
datasetWithPredictions.select(TOKEN_RESULT, TOKEN_BEGIN, TOKEN_END, 
PREDICTION_RESULT);


Iterator rowIt = tokensWithPredictions.toLocalIterator();

while(rowIt.hasNext()) {
Row row = rowIt.next();
[...] // do stuff here to convert the row

Any ideas of how I might be able to further optimize this?

Cheers,

Martin

Am 2022-02-18 07:42, schrieb mar...@wunderlich.com:


Hello,

I am working on optimising the performance of a Java ML/NLP 
application based on Spark / SparkNLP. For prediction, I am applying a 
trained model on a Spark dataset which consists of one column with 
only one row. The dataset is created like this:


List textList = Collections.singletonList(text);
Dataset data = sparkSession
.createDataset(textList, Encoders.STRING())
.withColumnRenamed(COL_VALUE, COL_TEXT);

The predictions are created like this:

PipelineModel fittedPipeline = pipeline.fit(dataset);

Dataset prediction = fittedPipeline.transform(dataset);

We noticed that the performance isn't quite as good as expected. After 
profiling the application with VisualVM, I noticed that the problem is 
with org.apache.spark.sql.Encoders.STRING() in the creation of the 
dataset, which by itself takes up about 75% of the time for the whole 
prediction method call.


So, is there a simpler and more efficient way of creating the required 
dataset, consisting of one column and one String row?


Thanks a lot.

Cheers,

Martin

Re: Encoders.STRING() causing performance problems in Java application

2022-02-18 Thread martin



I have been able to partially fix this issue by creating a static final 
field (i.e. a constant) for Encoders.STRING(). This removes the 
bottleneck associated with instantiating this Encoder. However, this 
moved the performance issue only to these two methods:


org.apache.spark.sql.SparkSession.createDataset (in the code below)

org.apache.spark.sql.Dataset.toLocalIterator ()

(ca. 40% each of execution time)

The second one is called when extracting the prediction results from the 
dataset:


Dataset datasetWithPredictions = predictor.predict(text);

Dataset tokensWithPredictions = 
datasetWithPredictions.select(TOKEN_RESULT, TOKEN_BEGIN, TOKEN_END, 
PREDICTION_RESULT);


Iterator rowIt = tokensWithPredictions.toLocalIterator();

while(rowIt.hasNext()) {
Row row = rowIt.next();
[...] // do stuff here to convert the row

Any ideas of how I might be able to further optimize this?

Cheers,

Martin

Am 2022-02-18 07:42, schrieb mar...@wunderlich.com:


Hello,

I am working on optimising the performance of a Java ML/NLP application 
based on Spark / SparkNLP. For prediction, I am applying a trained 
model on a Spark dataset which consists of one column with only one 
row. The dataset is created like this:


List textList = Collections.singletonList(text);
Dataset data = sparkSession
.createDataset(textList, Encoders.STRING())
.withColumnRenamed(COL_VALUE, COL_TEXT);

The predictions are created like this:

PipelineModel fittedPipeline = pipeline.fit(dataset);

Dataset prediction = fittedPipeline.transform(dataset);

We noticed that the performance isn't quite as good as expected. After 
profiling the application with VisualVM, I noticed that the problem is 
with org.apache.spark.sql.Encoders.STRING() in the creation of the 
dataset, which by itself takes up about 75% of the time for the whole 
prediction method call.


So, is there a simpler and more efficient way of creating the required 
dataset, consisting of one column and one String row?


Thanks a lot.

Cheers,

Martin

Encoders.STRING() causing performance problems in Java application

2022-02-17 Thread martin



Hello,

I am working on optimising the performance of a Java ML/NLP application 
based on Spark / SparkNLP. For prediction, I am applying a trained model 
on a Spark dataset which consists of one column with only one row. The 
dataset is created like this:


List textList = Collections.singletonList(text);
Dataset data = sparkSession
.createDataset(textList, Encoders.STRING())
.withColumnRenamed(COL_VALUE, COL_TEXT);

The predictions are created like this:

PipelineModel fittedPipeline = pipeline.fit(dataset);

Dataset prediction = fittedPipeline.transform(dataset);

We noticed that the performance isn't quite as good as expected. After 
profiling the application with VisualVM, I noticed that the problem is 
with org.apache.spark.sql.Encoders.STRING() in the creation of the 
dataset, which by itself takes up about 75% of the time for the whole 
prediction method call.


So, is there a simpler and more efficient way of creating the required 
dataset, consisting of one column and one String row?


Thanks a lot.

Cheers,

Martin

Re: how can I remove the warning message

2022-02-04 Thread Martin Grigorov
Hi,

This is a JVM warning, as Sean explained. You cannot control it via loggers.
You can disable it by passing --illegal-access=permit to java.
Read more about it at
https://softwaregarden.dev/en/posts/new-java/illegal-access-in-java-16/


On Sun, Jan 30, 2022 at 4:32 PM Sean Owen  wrote:

> This one you can ignore. It's from the JVM so you might be able to disable
> it by configuring the right JVM logger as well, but it also tells you right
> in the message how to turn it off!
>
> But this is saying that some reflective operations are discouraged in Java
> 9+. They still work and Spark needs them, but they cause a warning now. You
> can however ignore it.
>
> On Sun, Jan 30, 2022 at 2:56 AM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> I have often found that logging in the warnings is extremely useful, they
>> are just logs, and provide a lot of insights during upgrades, external
>> package loading, deprecation, debugging, etc.
>>
>> Do you have any particular reason to disable the warnings in a submitted
>> job?
>>
>> I used to disable warnings in spark-shell  using the
>> Logger.getLogger("akka").setLevel(Level.OFF) in case I have not completely
>> forgotten. Other details are mentioned here:
>> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.setLogLevel.html
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Jan 28, 2022 at 11:14 AM  wrote:
>>
>>> When I submitted the job from scala client, I got the warning messages:
>>>
>>> WARNING: An illegal reflective access operation has occurred
>>> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
>>> (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor
>>> java.nio.DirectByteBuffer(long,int)
>>> WARNING: Please consider reporting this to the maintainers of
>>> org.apache.spark.unsafe.Platform
>>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>>> reflective access operations
>>> WARNING: All illegal access operations will be denied in a future
>>> release
>>>
>>> How can I just remove those messages?
>>>
>>> spark: 3.2.0
>>> scala: 2.13.7
>>>
>>> Thank you.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Spark 3.1 Json4s-native jar compatibility

2022-02-04 Thread Martin Grigorov
Hi,

Amit said that he uses Spark 3.1, so the link should be
https://github.com/apache/spark/blob/branch-3.1/pom.xml#L879 (3.7.0-M5)

@Amit: check your classpath. Maybe there are more jars of this dependency.

On Thu, Feb 3, 2022 at 10:53 PM Sean Owen  wrote:

> You can look it up:
> https://github.com/apache/spark/blob/branch-3.2/pom.xml#L916
> 3.7.0-M11
>
> On Thu, Feb 3, 2022 at 1:57 PM Amit Sharma  wrote:
>
>> Hello, everyone. I am migrating my spark stream to spark version 3.1. I
>> also upgraded  json version  as below
>>
>> libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5"
>>
>>
>> While running the job I getting an error for the below code where I am
>> serializing the given inputs.
>>
>> implicit val formats = 
>> Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse], 
>> classOf[OverlayRequest],
>>   classOf[FTEResponseFromSpark], classOf[QuotaResponse], 
>> classOf[CloneResponse]
>>
>> )))
>>
>>
>> Exception in thread "streaming-job-executor-4" java.lang.NoSuchMethodError: 
>> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;
>>
>> It seems to me jar issue, not sure which version of json4s-native should I 
>> use with spark 3.1.
>>
>>


Re: [EXTERNAL] Fwd: Log4j upgrade in spark binary from 1.2.17 to 2.17.1

2022-01-31 Thread Martin Grigorov
Hi,

On Mon, Jan 31, 2022 at 7:57 PM KS, Rajabhupati
 wrote:

> Thanks a lot Sean. One final question before I close the conversion how do
> we know what are the features that will be added as part of spark 3.3
> version?
>

There will be release notes for 3.3 at linked at
https://spark.apache.org/downloads.html#release-notes-for-stable-releases
once it is released.


>
> Regards
> Rajabhupati
> --
> *From:* Sean Owen 
> *Sent:* Monday, January 31, 2022 10:50:16 PM
> *To:* KS, Rajabhupati 
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [EXTERNAL] Fwd: Log4j upgrade in spark binary from 1.2.17
> to 2.17.1
>
> https://spark.apache.org/versioning-policy.html
> 
>
> On Mon, Jan 31, 2022 at 11:15 AM KS, Rajabhupati <
> rajabhupati...@comcast.com> wrote:
>
> Thanks Sean , When is spark 3.3.0 is expected to release?
>
>
>
> Regards
>
> Raja
>
> *From:* Sean Owen 
> *Sent:* Monday, January 31, 2022 10:28 PM
> *To:* KS, Rajabhupati 
> *Subject:* [EXTERNAL] Fwd: Log4j upgrade in spark binary from 1.2.17 to
> 2.17.1
>
>
>
> Further, you're using an email that can't receive email ...
>
> -- Forwarded message -
> From: *Sean Owen* 
> Date: Mon, Jan 31, 2022 at 10:56 AM
> Subject: Re: Log4j upgrade in spark binary from 1.2.17 to 2.17.1
> To: KS, Rajabhupati 
> Cc: u...@spark.incubator.apache.org ,
> d...@spark.incubator.apache.org 
>
>
>
> (BTW you are sending to the Spark incubator list, and Spark has not been
> in incubation for about 7 years. Use user@spark.apache.org)
>
>
>
> What update are you looking for? this has been discussed extensively on
> the Spark mailing list.
>
> Spark is not evidently vulnerable to this. 3.3.0 will include log4j 2.17
> anyway.
>
>
>
> The ticket you cite points you to the correct ticket:
> https://issues.apache.org/jira/browse/SPARK-6305
> 
>
>
>
> On Mon, Jan 31, 2022 at 10:53 AM KS, Rajabhupati <
> rajabhupati...@comcast.com.invalid> wrote:
>
> Hi Team ,
>
>
>
> Is there any update on this request ?
>
>
>
> We did see Jira https://issues.apache.org/jira/browse/SPARK-37630
> 
> for this request but we see it closed .
>
>
>
> Regards
>
> Raja
>
>
>
> *From:* KS, Rajabhupati 
> *Sent:* Sunday, January 30, 2022 9:03 AM
> *To:* u...@spark.incubator.apache.org
> *Subject:* Log4j upgrade in spark binary from 1.2.17 to 2.17.1
>
>
>
> Hi Team,
>
>
>
> We were checking for log4j upgrade in Open source spark version to avoid
> the recent vulnerability in the spark binary . Do we have any new release
> which is planned to upgrade the log4j from 1.2.17 to 2.17.1.Any sooner
> response is appreciated ?
>
>
>
>
>
> Regards
>
> Rajabhupati
>
>


Re: Log4j 1.2.17 spark CVE

2021-12-13 Thread Martin Wunderlich
There is a discussion on Github on this topic and the recommendation is 
to upgrade from 1.x to 2.15.0, due to the vulnerability of 1.x: 
https://github.com/apache/logging-log4j2/pull/608


This discussion is also referenced by the German Federal Office for 
Information Security: https://www.bsi.bund.de/EN/Home/home_node.html


Cheers,

Martin

Am 13.12.21 um 17:02 schrieb Jörn Franke:
Is it in any case appropriate to use log4j 1.x which is not maintained 
anymore and has other security vulnerabilities which won’t be fixed 
anymore ?



Am 13.12.2021 um 06:06 schrieb Sean Owen :


Check the CVE - the log4j vulnerability appears to affect log4j 2, 
not 1.x. There was mention that it could affect 1.x when used with 
JNDI or SMS handlers, but Spark does neither. (unless anyone can 
think of something I'm missing, but never heard or seen that come up 
at all in 7 years in Spark)


The big issue would be applications that themselves configure log4j 
2.x, but that's not a Spark issue per se.


On Sun, Dec 12, 2021 at 10:46 PM Pralabh Kumar 
 wrote:


Hi developers,  users

Spark is built using log4j 1.2.17 . Is there a plan to upgrade
based on recent CVE detected ?


Regards
Pralabh kumar


Re: [Spark] Does Spark support backward and forward compatibility?

2021-11-24 Thread Martin Wunderlich

Hi Amin,

This might be only marginally relevant to your question, but in my 
project I also noticed the following: The trained and exported Spark 
models (i.e. pipelines saved to binary files) are also not compatible 
between versions, at least between major versions. I noticed this when 
trying to load a model built with Spark 2.4.4 after updating to 3.2.0. 
This didn't work.


Cheers,

Martin

Am 24.11.21 um 20:18 schrieb Sean Owen:
I think/hope that it goes without saying you can't mix Spark versions 
within a cluster.
Forwards compatibility is something you don't generally expect as a 
default from any piece of software, so not sure there is something to 
document explicitly.
Backwards compatibility is important, and this is documented 
extensively where it doesn't hold in the Spark docs and release notes.



On Wed, Nov 24, 2021 at 1:16 PM Amin Borjian 
 wrote:


Thank you very much for the reply you sent. It would be great if
these items were mentioned in the Spark document (for example, the
download page or something else)

If I understand correctly, it means that we can compile the client
(for example Java, etc.) with a newer version (for example 3.2.0)
within the range of a major version against older server (for
example 3.1.x) and do not see any problem in most cases. Am I
right?(Because the issue of backward-compatibility can be
expressed from both the server and the client view, I repeated the
sentence to make sure I got it right.)

But what happened if we update server to 3.2.x and our client was
in version 3.1.x? Does it client can work with newer cluster
version because it uses just old feature of severs? (Maybe you
mean this and in fact my previous sentence was wrong and I
misunderstood)

*From: *Sean Owen <mailto:sro...@gmail.com>
*Sent: *Wednesday, November 24, 2021 5:38 PM
*To: *Amin Borjian <mailto:borjianami...@outlook.com>
*Cc: *user@spark.apache.org
*Subject: *Re: [Spark] Does Spark support backward and forward
compatibility?

Can you mix different Spark versions on driver and executor? no.

Can you compile against a different version of Spark than you run
on? That typically works within a major release, though forwards
compatibility may not work (you can't use a feature that doesn't
exist in the version on the cluster). Compiling vs 3.2.0 and
running on 3.1.x for example should work fine in 99% of cases.

On Wed, Nov 24, 2021 at 8:04 AM Amin Borjian
 wrote:

I have a simple question about using Spark, which although
most tools usually explain this question explicitly (in
important text, such as a specific format or a separate page),
I did not find it anywhere. Maybe my search was not enough,
but I thought it was good that I ask this question in the hope
that maybe the answer will benefit other people as well.

Spark binary is usually downloaded from the following link and
installed and configured on the cluster: Download Apache Spark
<https://spark.apache.org/downloads.html>

If, for example, we use the Java language for programming
(although it can be other supported languages), we need the
following dependencies to communicate with Spark:

||

|    org.apache.spark|

|    spark-core_2|.12||

|    |3.2.0||

||

||

|    org.apache.spark|

|    spark-sql_2|.12||

|    |3.2.0||

||

As is clear, both the Spark cluster (binary of Spark) and the
dependencies used on the application side have a specific
version. In my opinion, it is obvious that if the version used
is the same on both the application side and the server side,
everything will most likely work in its ideal state without
any problems.

But the question is, what if the two versions are not the
same? Is it possible to have compatibility between the server
and the application in specific number of conditions (such as
not changing major version)? Or, for example, if the client is
always ahead, is it not a problem? Or if the server is always
ahead, is it not a problem?

The argument is that there may be a library that I did not write
and it is an old version, but I want to update my cluster (server
version). Or it may not be possible for me to update the server
version and all the applications version at the same time, so I
want to update each one separately. As a result, the
application-server version differs in a period of time. (maybe
short or long period) I want to know exactly how Spark works in
this situation.


Re: EXT: Re: Create Dataframe from a single String in Java

2021-11-18 Thread martin
Thanks a lot, Sebastian and Vibhor. You're right, I can call the 
createDataset() also on the Spark session. Not sure how I missed that.


Cheers,

Martin

Am 2021-11-18 12:01, schrieb Vibhor Gupta:

You can try something like below. It creates a dataset and then 
converts it into a dataframe.


sparkSession.createDataset(
Arrays.asList("apple","orange","banana"),
Encoders.STRING()
).toDF("fruits").show();

Regards,
Vibhor Gupta.

-

From: Sebastian Piu 
Sent: Thursday, November 18, 2021 4:20 PM
To: mar...@wunderlich.com 
Cc: user 
Subject: EXT: Re: Create Dataframe from a single String in Java

EXTERNAL: Report suspicious emails to Email Abuse.

You can call that on sparkSession to

On Thu, 18 Nov 2021, 10:48 ,  wrote:

PS: The following works, but it seems rather awkward having to use the 
SQLContext here.


SQLContext sqlContext = new SQLContext(sparkContext);

Dataset data = sqlContext
.createDataset(textList, Encoders.STRING())
.withColumnRenamed("value", "text");

Am 2021-11-18 11:26, schrieb mar...@wunderlich.com:

Hello,

I am struggling with a task that should be super simple: I would like 
to create a Spark DF of Type Dataset with one column from a single 
String (or from a one-element List of Strings). The column header 
should be "text".


SparkContext.parallelize() does not work, because it returns RDD and 
not Dataset and it takes a "ClassTag" as 3rd parameter.


I am able to convert a List of Strings to JavaRDD using this:
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);

JavaRDD javaRdd = javaSparkContext.parallelize(textList);

But then I am stuck with this javaRDD. Besides, it seems overly complex 
having to create an intermediate representation.


There is also this SO post with a solution in Scala that I have not 
been able to convert to Java, because the APIs differ:


https://stackoverflow.com/questions/44028677/how-to-create-a-dataframe-from-a-string

Basically, what I am looking for is something simple like:

Dataset myData = sparkSession.createDataFrame(textList, "text");

Any hints? Thanks a lot.

Cheers,

Martin

Re: Create Dataframe from a single String in Java

2021-11-18 Thread martin
PS: The following works, but it seems rather awkward having to use the 
SQLContext here.


SQLContext sqlContext = new SQLContext(sparkContext);

Dataset data = sqlContext
  .createDataset(textList, Encoders.STRING())
  .withColumnRenamed("value", "text");

Am 2021-11-18 11:26, schrieb mar...@wunderlich.com:


Hello,

I am struggling with a task that should be super simple: I would like 
to create a Spark DF of Type Dataset with one column from a single 
String (or from a one-element List of Strings). The column header 
should be "text".


SparkContext.parallelize() does not work, because it returns RDD and 
not Dataset and it takes a "ClassTag" as 3rd parameter.


I am able to convert a List of Strings to JavaRDD using this:
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);

JavaRDD javaRdd = javaSparkContext.parallelize(textList);

But then I am stuck with this javaRDD. Besides, it seems overly complex 
having to create an intermediate representation.


There is also this SO post with a solution in Scala that I have not 
been able to convert to Java, because the APIs differ:


https://stackoverflow.com/questions/44028677/how-to-create-a-dataframe-from-a-string

Basically, what I am looking for is something simple like:

Dataset myData = sparkSession.createDataFrame(textList, "text");

Any hints? Thanks a lot.

Cheers,

Martin

Create Dataframe from a single String in Java

2021-11-18 Thread martin

Hello,

I am struggling with a task that should be super simple: I would like to 
create a Spark DF of Type Dataset with one column from a single 
String (or from a one-element List of Strings). The column header should 
be "text".


SparkContext.parallelize() does not work, because it returns RDD and 
not Dataset and it takes a "ClassTag" as 3rd parameter.


I am able to convert a List of Strings to JavaRDD using this:
JavaSparkContext javaSparkContext = new 
JavaSparkContext(sparkContext);


JavaRDD javaRdd = javaSparkContext.parallelize(textList);

But then I am stuck with this javaRDD. Besides, it seems overly complex 
having to create an intermediate representation.


There is also this SO post with a solution in Scala that I have not been 
able to convert to Java, because the APIs differ:


https://stackoverflow.com/questions/44028677/how-to-create-a-dataframe-from-a-string

Basically, what I am looking for is something simple like:

Dataset myData = sparkSession.createDataFrame(textList, 
"text");


Any hints? Thanks a lot.

Cheers,

Martin

Re: Using MulticlassClassificationEvaluator for NER evaluation

2021-11-11 Thread martin
OK, thank you, Gourav. I didn't realize that Spark works with numerical 
formats only by design.


What I am trying to achieve is rather straight-forward: Evaluate a 
trained model using the standard metrics provided by 
MulticlassClassificationEvaluator. Since this isn't possible for text 
labels, we'll need to work around it and possible create a wrapper 
evaluator around the Spark standard class.


Thanks a lot for the help.

Cheers,

Martin

Am 2021-11-11 13:10, schrieb Gourav Sengupta:


Hi Martin,

okay, so you will ofcourse need to translate the NER string output to a 
numerical format as you would do with any text data before feeding it 
to SPARK ML. Please read SPARK ML documentation on this. I think that 
they are quite clear on how to do that.
But more importantly please try to answer Sean's question, explaining 
what you are trying to achieve and how, always helps.


Regards,
Gourav Sengupta

On Thu, Nov 11, 2021 at 11:03 AM Martin Wunderlich 
 wrote:


Hi Gourav,

Mostly correct. The output of SparNLP here is a trained 
pipeline/model/transformer. I am feeding this trained pipeline to the 
MulticlassClassificationEvaluator for evaluation and this 
MulticlassClassificationEvaluator only accepts floats or doubles are 
the labels (instead of NER labels).


Cheers,

Martin

Am 11.11.21 um 11:39 schrieb Gourav Sengupta:
Hi Martin,

just to confirm, you are taking the output of SPARKNLP, and then trying 
to feed it to SPARK ML for running algorithms on the output of 
NERgenerated by SPARKNLP right?


Regards,
Gourav Sengupta

On Thu, Nov 11, 2021 at 8:00 AM  wrote:

Hi Sean,

Apologies for the delayed reply. I've been away on vacation and then 
busy catching up afterwards.


Regarding the evalution using MulticlassClassificationEvaluator: This 
is a about a sequence labeling task to identify specific non-standard 
named entities. The training and evaluation data is in CoNLL format. 
The training works all fine, using the categorical labels for the NEs. 
In order to use the MulticlassClassificationEvaluator, however, I need 
to convert these to floats. This is possible and also works fine, it is 
just inconvenient having to do the extra step. I would have expected 
the MulticlassClassificationEvaluator to be able to use the labels 
directly.


I will try to create and propose a code change in this regard, if or 
when I find the time.


Cheers,

Martin

Am 2021-10-25 14:31, schrieb Sean Owen:

I don't think the question is representation as double. The question is 
how this output represents a label? This looks like the result of an 
annotator. What are you classifying? you need, first, ground truth and 
prediction somewhere to use any utility to assess classification 
metrics.


On Mon, Oct 25, 2021 at 5:42 AM  wrote:

Hello,

I am using SparkNLP to do some NER. The result datastructure after 
training and classification is a Dataset, with one column each for 
labels and predictions. For evaluating the model, I would like to use 
the Spark ML class 
org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator. 
However, this evaluator expects labels as double numbers. In the case 
of an NER task, the results in my case are of type 
array,embeddings:array>>.


It would be possible, of course, to convert this format to the required 
doubles. But is there a way to easily apply 
MulticlassClassificationEvaluator to the NER task or is there maybe a 
better evaluator? I haven't found anything yet (neither in Spark ML nor 
in SparkNLP).


Thanks a lot.

Cheers,

Martin

Re: Using MulticlassClassificationEvaluator for NER evaluation

2021-11-11 Thread Martin Wunderlich

Hi Gourav,

Mostly correct. The output of SparNLP here is a trained 
pipeline/model/transformer. I am feeding this trained pipeline to the 
MulticlassClassificationEvaluator for evaluation and this 
MulticlassClassificationEvaluator only accepts floats or doubles are the 
labels (instead of NER labels).


Cheers,

Martin

Am 11.11.21 um 11:39 schrieb Gourav Sengupta:

Hi Martin,

just to confirm, you are taking the output of SPARKNLP, and then 
trying to feed it to SPARK ML for running algorithms on the output of 
NERgenerated by SPARKNLP right?



Regards,
Gourav Sengupta

On Thu, Nov 11, 2021 at 8:00 AM  wrote:

Hi Sean,

Apologies for the delayed reply. I've been away on vacation and
then busy catching up afterwards.

Regarding the evalution using MulticlassClassificationEvaluator:
This is a about a sequence labeling task to identify specific
non-standard named entities. The training and evaluation data is
in CoNLL format. The training works all fine, using the
categorical labels for the NEs. In order to use the
MulticlassClassificationEvaluator, however, I need to convert
these to floats. This is possible and also works fine, it is just
inconvenient having to do the extra step. I would have expected
the MulticlassClassificationEvaluator to be able to use the labels
directly.

I will try to create and propose a code change in this regard, if
or when I find the time.

Cheers,

Martin


Am 2021-10-25 14:31, schrieb Sean Owen:


I don't think the question is representation as double. The
question is how this output represents a label? This looks like
the result of an annotator. What are you classifying? you need,
first, ground truth and prediction somewhere to use any utility
to assess classification metrics.

On Mon, Oct 25, 2021 at 5:42 AM  wrote:

Hello,

I am using SparkNLP to do some NER. The result datastructure
after training and classification is a Dataset, with one
column each for labels and predictions. For evaluating the
model, I would like to use the Spark ML class
org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.
However, this evaluator expects labels as double numbers. In
the case of an NER task, the results in my case are of type

array,embeddings:array>>.


It would be possible, of course, to convert this format to
the required doubles. But is there a way to easily apply
MulticlassClassificationEvaluator to the NER task or is there
maybe a better evaluator? I haven't found anything yet
(neither in Spark ML nor in SparkNLP).

Thanks a lot.

Cheers,

    Martin


Re: Feature (?): Setting custom parameters for a Spark MLlib pipeline

2021-11-11 Thread martin
Yes, that would be a suitable option. We could just extend the standard 
Spark MLLib Transformer and add the required meta-data.


Just out of curiosity: Is there a specific reason for why the user of a 
standard Transform would not be able to add arbitrary key-value pairs 
for additional meta-data? This could also be handy not just for things 
like versioning, but also for storing evaluation metrics together with a 
trained pipeline (for people who aren't using something like MLFlow, 
yet).


Cheers,

Martin

Am 2021-10-25 14:38, schrieb Sean Owen:


You can write a custom Transformer or Estimator?

On Mon, Oct 25, 2021 at 7:37 AM Sonal Goyal  
wrote:

Hi Martin,

Agree, if you don't need the other features of MLFlow then it is likely 
overkill.


Cheers,
Sonal
https://github.com/zinggAI/zingg

On Mon, Oct 25, 2021 at 4:06 PM  wrote:

Hi Sonal,

Thanks a lot for this suggestion. I presume it might indeed be possible 
to use MLFlow for this purpose, but at present it seems a bit too much 
to introduce another framework only for storing arbitrary meta-data 
with trained ML pipelines. I was hoping there might be a way to do this 
natively in Spark ML. Otherwise, I'll just create a wrapper class for 
the trained models.


Cheers,

Martin

Am 2021-10-24 21:16, schrieb Sonal Goyal:

Does MLFlow help you? https://mlflow.org/

I don't know if ML flow can save arbitrary key-value pairs and 
associate them with a model, but versioning and evaluation etc are 
supported.


Cheers,
Sonal
https://github.com/zinggAI/zingg

On Wed, Oct 20, 2021 at 12:59 PM  wrote:

Hello,

This is my first post to this list, so I hope I won't violate any 
(un)written rules.


I recently started working with SparkNLP for a larger project. SparkNLP 
in turn is based Apache Spark's MLlib. One thing I found missing is the 
ability to store custom parameters in a Spark pipeline. It seems only 
certain pre-configured parameter values are allowed (e.g. "stages" for 
the Pipeline class).


IMHO, it would be handy to be able to store custom parameters, e.g. for 
model versions or other meta-data, so that these parameters are stored 
with a trained pipeline, for instance. This could also be used to 
include evaluation results, such as accuracy, with trained ML models.


(I also asked this on Stackoverflow, but didn't get a response, yet: 
https://stackoverflow.com/questions/69627820/setting-custom-parameters-for-a-spark-mllib-pipeline)


Would does the community think about this proposal? Has it been 
discussed before perhaps? Any thoughts?


Cheers,

Martin

Re: Using MulticlassClassificationEvaluator for NER evaluation

2021-11-10 Thread martin

Hi Sean,

Apologies for the delayed reply. I've been away on vacation and then 
busy catching up afterwards.


Regarding the evalution using MulticlassClassificationEvaluator: This is 
a about a sequence labeling task to identify specific non-standard named 
entities. The training and evaluation data is in CoNLL format. The 
training works all fine, using the categorical labels for the NEs. In 
order to use the MulticlassClassificationEvaluator, however, I need to 
convert these to floats. This is possible and also works fine, it is 
just inconvenient having to do the extra step. I would have expected the 
MulticlassClassificationEvaluator to be able to use the labels directly.


I will try to create and propose a code change in this regard, if or 
when I find the time.


Cheers,

Martin

Am 2021-10-25 14:31, schrieb Sean Owen:

I don't think the question is representation as double. The question is 
how this output represents a label? This looks like the result of an 
annotator. What are you classifying? you need, first, ground truth and 
prediction somewhere to use any utility to assess classification 
metrics.


On Mon, Oct 25, 2021 at 5:42 AM  wrote:


Hello,

I am using SparkNLP to do some NER. The result datastructure after 
training and classification is a Dataset, with one column each 
for labels and predictions. For evaluating the model, I would like to 
use the Spark ML class 
org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator. 
However, this evaluator expects labels as double numbers. In the case 
of an NER task, the results in my case are of type 
array,embeddings:array>>.


It would be possible, of course, to convert this format to the 
required doubles. But is there a way to easily apply 
MulticlassClassificationEvaluator to the NER task or is there maybe a 
better evaluator? I haven't found anything yet (neither in Spark ML 
nor in SparkNLP).


Thanks a lot.

Cheers,

Martin

Using MulticlassClassificationEvaluator for NER evaluation

2021-10-25 Thread martin

Hello,

I am using SparkNLP to do some NER. The result datastructure after 
training and classification is a Dataset, with one column each for 
labels and predictions. For evaluating the model, I would like to use 
the Spark ML class 
org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator. 
However, this evaluator expects labels as double numbers. In the case of 
an NER task, the results in my case are of type 
array,embeddings:array>>.


It would be possible, of course, to convert this format to the required 
doubles. But is there a way to easily apply 
MulticlassClassificationEvaluator to the NER task or is there maybe a 
better evaluator? I haven't found anything yet (neither in Spark ML nor 
in SparkNLP).


Thanks a lot.

Cheers,

Martin

Re: Feature (?): Setting custom parameters for a Spark MLlib pipeline

2021-10-25 Thread martin

Hi Sonal,

Thanks a lot for this suggestion. I presume it might indeed be possible 
to use MLFlow for this purpose, but at present it seems a bit too much 
to introduce another framework only for storing arbitrary meta-data with 
trained ML pipelines. I was hoping there might be a way to do this 
natively in Spark ML. Otherwise, I'll just create a wrapper class for 
the trained models.


Cheers,

Martin

Am 2021-10-24 21:16, schrieb Sonal Goyal:


Does MLFlow help you? https://mlflow.org/

I don't know if ML flow can save arbitrary key-value pairs and 
associate them with a model, but versioning and evaluation etc are 
supported.


Cheers,
Sonal
https://github.com/zinggAI/zingg

On Wed, Oct 20, 2021 at 12:59 PM  wrote:


Hello,

This is my first post to this list, so I hope I won't violate any 
(un)written rules.


I recently started working with SparkNLP for a larger project. 
SparkNLP in turn is based Apache Spark's MLlib. One thing I found 
missing is the ability to store custom parameters in a Spark pipeline. 
It seems only certain pre-configured parameter values are allowed 
(e.g. "stages" for the Pipeline class).


IMHO, it would be handy to be able to store custom parameters, e.g. 
for model versions or other meta-data, so that these parameters are 
stored with a trained pipeline, for instance. This could also be used 
to include evaluation results, such as accuracy, with trained ML 
models.


(I also asked this on Stackoverflow, but didn't get a response, yet: 
https://stackoverflow.com/questions/69627820/setting-custom-parameters-for-a-spark-mllib-pipeline)


Would does the community think about this proposal? Has it been 
discussed before perhaps? Any thoughts?


Cheers,

Martin

Feature (?): Setting custom parameters for a Spark MLlib pipeline

2021-10-20 Thread martin

Hello,

This is my first post to this list, so I hope I won't violate any 
(un)written rules.


I recently started working with SparkNLP for a larger project. SparkNLP 
in turn is based Apache Spark's MLlib. One thing I found missing is the 
ability to store custom parameters in a Spark pipeline. It seems only 
certain pre-configured parameter values are allowed (e.g. "stages" for 
the Pipeline class).


IMHO, it would be handy to be able to store custom parameters, e.g. for 
model versions or other meta-data, so that these parameters are stored 
with a trained pipeline, for instance. This could also be used to 
include evaluation results, such as accuracy, with trained ML models.


(I also asked this on Stackoverflow, but didn't get a response, yet: 
https://stackoverflow.com/questions/69627820/setting-custom-parameters-for-a-spark-mllib-pipeline)


Would does the community think about this proposal? Has it been 
discussed before perhaps? Any thoughts?


Cheers,

Martin

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Chris Martin
Hmm then my guesses are (in order of decreasing probability:

* Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
compatible with the lastest spark release.
* You've got 16 threads per task on a 16 core machine.  Should be fine, but
I wonder if it's confusing things as you don't also set
spark.executor.cores and Databricks might also default that to 1.
* There's some custom partitioner in play which is causing everything to go
to the same partition.
* The group keys are all hashing to the same value (it's difficult to see
how this would be the case if the group keys are genuinely different, but
maybe there's something else going on).

My hints:

1. Make sure you're using a recent version of sparkler
2. Try repartition with a custom partitioner that you know will end things
to different partitions
3. Try either removing "spark.task.cpus":"16"  or setting
spark.executor.cores to 1.
4. print out the group keys and see if there's any weird pattern to them.
5. See if the same thing happens in spark local.

If you have a reproducible example you can post publically then I'm happy
to  take a look.

Chris

On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:

> Yeah to test that I just set the group key to the ID in the record which
> is a solr supplied UUID, which means effectively you end up with 4000
> groups now.
>
> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin  wrote:
>
>> One thing I would check is this line:
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>
>> how many distinct groups do you ended up with?  If there's just one then
>> I think you might see the behaviour you observe.
>>
>> Chris
>>
>>
>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>
>>> Also just to follow up on that slightly, I did also try off the back of
>>> another comment:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>
>>>
>>> Where I repartitioned that scoredRdd map out of interest, it then
>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>> still on a single executor 
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>>
>>>>
>>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>>> RDD's that end up in that flatmap that I linked to first.
>>>>
>>>> The fair fetcher is an interface to a pluggable backend that basically
>>>> takes some of the fields and goes and crawls websites listed in them
>>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>>> it to scale out a bit more.
>>>>
>>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>>> then move on and get another one and so on. That way you're not saturating
>>>> a node trying to look up all of them and you could add more nodes for
>>>> greater capacity pretty quickly. Once the website has been captured, you
>>>> can then "score" it for want of a better term to determine its usefulness,
>>>> which is where the map is being triggered.
>>>>
>>>> In answer to your questions Sean, no action seems triggered until you
>>>> end up in the score block and the sc.runJob() because thats literally the
>>>> next line of functionality as Kafka isn't enabled.
>>>>
>>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>>> rs.iterator, localFetchDelay,
>>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>>> StatusUpdateSolrTransformer).toSeq })
>>>>   .persist()
>>>>
>>>> if (kafkaEnable) {
>>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>>> }
>>>> val scoredRdd = score(fetchedRdd)
>>>>
>>>>
>>>&g

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Chris Martin
One thing I would check is this line:

val fetchedRdd = rdd.map(r => (r.getGroup, r))

how many distinct groups do you ended up with?  If there's just one then I
think you might see the behaviour you observe.

Chris


On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer).toSeq })
>>   .persist()
>>
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> 
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>>
 Thanks Mich,

 The key on the first iteration is just a string that says "seed", so it
 is indeed on the first crawl the same across all of the groups. Further
 iterations would be different, but I'm not there yet. I was under the
 impression that a repartition would distribute the tasks. Is that not the
 case?

 Thanks

 Tom

 On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can
> check UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried
> forcing the partition count (50 on a 16 core per node cluster) and also
> repartitioning, but every time all the jobs are scheduled 

GPU job in Spark 3

2021-04-09 Thread Martin Somers
Hi Everyone !!

Im trying to get on premise GPU instance of Spark 3 running on my ubuntu
box, and I am following:
https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-on-prem.html#example-join-operation

Anyone with any insight into why a spark job isnt being ran on the GPU -
appears to be all on the CPU, hadoop binary installed and appears to be
functioning fine

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

here is my setup on ubuntu20.10


▶ nvidia-smi

+-+
| NVIDIA-SMI 460.39   Driver Version: 460.39   CUDA Version: 11.2
  |
|---+--+--+
| GPU  NamePersistence-M| Bus-IdDisp.A | Volatile Uncorr.
ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap| Memory-Usage | GPU-Util  Compute
M. |
|   |  |   MIG
M. |
|===+==+==|
|   0  GeForce RTX 3090Off  | :21:00.0  On |
 N/A |
|  0%   38CP819W / 370W |478MiB / 24265MiB |  0%
 Default |
|   |  |
 N/A |
+---+--+--+

/opt/sparkRapidsPlugin


▶ ls
cudf-0.18.1-cuda11.jar  getGpusResources.sh  rapids-4-spark_2.12-0.4.1.jar

▶ scalac --version
Scala compiler version 2.13.0 -- Copyright 2002-2019, LAMP/EPFL and
Lightbend, Inc.


▶ spark-shell --version
2021-04-09 17:05:36,158 WARN util.Utils: Your hostname, studio resolves to
a loopback address: 127.0.1.1; using 192.168.0.221 instead (on interface
wlp71s0)
2021-04-09 17:05:36,159 WARN util.Utils: Set SPARK_LOCAL_IP if you need to
bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
(file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor
java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of
org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.1
  /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10
Branch HEAD
Compiled by user ubuntu on 2021-02-22T01:04:02Z
Revision 1d550c4e90275ab418b9161925049239227f3dc9
Url https://github.com/apache/spark
Type --help for more information.


here is how I calling spark prior to adding the test job

$SPARK_HOME/bin/spark-shell \
   --master local \
   --num-executors 1 \
   --conf spark.executor.cores=16 \
   --conf spark.rapids.sql.concurrentGpuTasks=1 \
   --driver-memory 10g \
   --conf
spark.executor.extraClassPath=${SPARK_CUDF_JAR}:${SPARK_RAPIDS_PLUGIN_JAR}

   --conf spark.rapids.memory.pinnedPool.size=16G \
   --conf spark.locality.wait=0s \
   --conf spark.sql.files.maxPartitionBytes=512m \
   --conf spark.sql.shuffle.partitions=10 \
   --conf spark.plugins=com.nvidia.spark.SQLPlugin \
   --files $SPARK_RAPIDS_DIR/getGpusResources.sh \
   --jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR}


Test job is from the example join-operation

val df = sc.makeRDD(1 to 1000, 6).toDF
val df2 = sc.makeRDD(1 to 1000, 6).toDF
df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" ===
$"b").count


I just noticed that the scala versions are out of sync - that shouldnt
affect it?


is there anything else I can try in the --conf or is there any logs to see
what might be failing behind the scenes, any suggestions?


Thanks
Martin


-- 
M


Structured Streaming together with Cassandra Queries

2018-09-22 Thread Martin Engen
Hello,

I have a case where I am continuously getting a bunch sensor-data which is 
being stored into a Cassandra table (through Kafka). Every week or so, I want 
to manually enter additional data into the system - and I want this to trigger 
some calculations merging the manual entered data, and the weeks worth of 
streaming sensor-data.

Is there a way to make dynamic Cassandra queries based on data coming into 
Spark?

example: Pressure sensors are being continuously stored into Cassandra, and I 
enter a weeks worth of temperatures into the system at the end of the week (1 
day/row at a time).
I want each of these rows to trigger queries to Cassandra to get the pressures 
for every specific day, and do some calculations on this.

I have been looking at using Structured Streaming with the 
Cassandra-spark-connector, but I cant find  a way to take data from a row in 
structured streaming into account on the query being made. And I seem to have 
to query for 'everything', and then filter in Spark.

Any ideas or tips for how to solve this?


How to work around NoOffsetForPartitionException when using Spark Streaming

2018-06-01 Thread Martin Peng
Hi,

We see below exception when using Spark Kafka streaming 0.10 on a normal
Kafka topic. Not sure why offset missing in zk, but since Spark streaming
override the offset reset policy to none in the code. I can not set the
reset policy to latest(I don't really care data loss now).

Is there any quick way to fix the missing offset or work around this?

Thanks,
Martin

1/06/2018 17:11:02: ERROR:the type of error is
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition:
elasticsearchtopicrealtimereports-97
01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for
partition: elasticsearchtopicrealtimereports-97
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370)
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248)
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.immutable.List.map(List.scala:285)
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330

Re: Structured Streaming, Reading and Updating a variable

2018-05-16 Thread Martin Engen
Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


Any ideas about how to handle this error?


Thanks,
Martin Engen

From: Lalwani, Jayesh <jayesh.lalw...@capitalone.com>
Sent: Tuesday, May 15, 2018 9:59 PM
To: Martin Engen; user@spark.apache.org
Subject: Re: Structured Streaming, Reading and Updating a variable


Do you have a code sample, and detailed error message/exception to show?



From: Martin Engen <martin.en...@outlook.com>
Date: Tuesday, May 15, 2018 at 9:24 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: Structured Streaming, Reading and Updating a variable



Hello,



I'm working with Structured Streaming, and I need a method of keeping a running 
average based on last 24hours of data.

To help with this, I can use Exponential Smoothing, which means I really only 
need to store 1 value from a previous calculation into the new, and update this 
variable as calculations carry on.



Implementing this is a much bigger challenge then I ever imagined.





I've tried using Accumulators and to Query/Store data to Cassandra after every 
calculation. Both methods worked somewhat locally , but I don't seem to be able 
to use these in the Spark Worker Nodes,  as I get the error

"java.lang.NoClassDefFoundError: Could not initialize class error" both for the 
accumulator and the cassandra connection libary



How can you read/update a variable while doing calculations using Structured 
Streaming?



Thank you







The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Martin Engen
Hello,

I'm working with Structured Streaming, and I need a method of keeping a running 
average based on last 24hours of data.
To help with this, I can use Exponential Smoothing, which means I really only 
need to store 1 value from a previous calculation into the new, and update this 
variable as calculations carry on.

Implementing this is a much bigger challenge then I ever imagined.


I've tried using Accumulators and to Query/Store data to Cassandra after every 
calculation. Both methods worked somewhat locally , but I don't seem to be able 
to use these in the Spark Worker Nodes,  as I get the error
"java.lang.NoClassDefFoundError: Could not initialize class error" both for the 
accumulator and the cassandra connection libary

How can you read/update a variable while doing calculations using Structured 
Streaming?

Thank you




Re: Spark Job crash due to File Not found when shuffle intermittently

2017-07-25 Thread Martin Peng
cool~ Thanks Kang! I will check and let you know.
Sorry for delay as there is an urgent customer issue today.

Best
Martin

2017-07-24 22:15 GMT-07:00 周康 <zhoukang199...@gmail.com>:

> * If the file exists but is a directory rather than a regular file, does
> * not exist but cannot be created, or cannot be opened for any other
> * reason then a FileNotFoundException is thrown.
>
> After searching into FileOutputStream i saw this annotation.So you can check 
> executor node first(may be no permission or no space,or no enough file 
> descriptor)
>
>
> 2017-07-25 13:05 GMT+08:00 周康 <zhoukang199...@gmail.com>:
>
>> You can also check whether space left in the executor node enough to
>> store shuffle file or not.
>>
>> 2017-07-25 13:01 GMT+08:00 周康 <zhoukang199...@gmail.com>:
>>
>>> First,spark will handle task fail so if job ended normally , this error
>>> can be ignore.
>>> Second, when using BypassMergeSortShuffleWriter, it will first write
>>> data file then write an index file.
>>> You can check "Failed to delete temporary index file at" or "fail to
>>> rename file" in related executor node's log file.
>>>
>>> 2017-07-25 0:33 GMT+08:00 Martin Peng <wei...@gmail.com>:
>>>
>>>> Is there anyone at share me some lights about this issue?
>>>>
>>>> Thanks
>>>> Martin
>>>>
>>>> 2017-07-21 18:58 GMT-07:00 Martin Peng <wei...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have several Spark jobs including both batch job and Stream jobs to
>>>>> process the system log and analyze them. We are using Kafka as the 
>>>>> pipeline
>>>>> to connect each jobs.
>>>>>
>>>>> Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some
>>>>> of the jobs(both batch or streaming) are thrown below exceptions
>>>>> randomly(either after several hours run or just run in 20 mins). Can 
>>>>> anyone
>>>>> give me some suggestions about how to figure out the real root cause?
>>>>> (Looks like google result is not very useful...)
>>>>>
>>>>> Thanks,
>>>>> Martin
>>>>>
>>>>> 00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task
>>>>> 60.0 in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
>>>>> java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/201
>>>>> 60924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a
>>>>> -4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-
>>>>> 4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c356
>>>>> 43e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
>>>>> (No such file or directory)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native
>>>>> Method)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.open(
>>>>> FileOutputStream.java:270)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.>>>> >(FileOutputStream.java:213)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.>>>> >(FileOutputStream.java:162)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.shuffle.Index
>>>>> ShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlo
>>>>> ckResolver.scala:144)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.shuffle.sort.
>>>>> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWri
>>>>> ter.java:128)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:96)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:53)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Tas
>>>>> k.run(Task.scala:99)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.executor.Exec
>>>>> utor$TaskRunner.run(Executor.scala:282)
>>>>> 00:30:04,510 WARN  - at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> 00:30:04,510 WARN  - at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> 00:30:04,510 WARN  - at java.lang.Thread.run(Thread.ja

Re: Spark Job crash due to File Not found when shuffle intermittently

2017-07-24 Thread Martin Peng
Is there anyone at share me some lights about this issue?

Thanks
Martin

2017-07-21 18:58 GMT-07:00 Martin Peng <wei...@gmail.com>:

> Hi,
>
> I have several Spark jobs including both batch job and Stream jobs to
> process the system log and analyze them. We are using Kafka as the pipeline
> to connect each jobs.
>
> Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
> the jobs(both batch or streaming) are thrown below exceptions
> randomly(either after several hours run or just run in 20 mins). Can anyone
> give me some suggestions about how to figure out the real root cause?
> (Looks like google result is not very useful...)
>
> Thanks,
> Martin
>
> 00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
> in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
> java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/
> 20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-
> 8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-
> 27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-
> c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
> (No such file or directory)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.open(
> FileOutputStream.java:270)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.<
> init>(FileOutputStream.java:213)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.<
> init>(FileOutputStream.java:162)
> 00:30:04,510 WARN  - at org.apache.spark.shuffle.
> IndexShuffleBlockResolver.writeIndexFileAndCommit(
> IndexShuffleBlockResolver.scala:144)
> 00:30:04,510 WARN  - at org.apache.spark.shuffle.sort.
> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> Task.run(Task.scala:99)
> 00:30:04,510 WARN  - at org.apache.spark.executor.
> Executor$TaskRunner.run(Executor.scala:282)
> 00:30:04,510 WARN  - at java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 00:30:04,510 WARN  - at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)
>
> 00:30:04,580 INFO  - Driver stacktrace:
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1435)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1423)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1422)
> 00:30:04,580 INFO  - scala.collection.mutable.
> ResizableArray$class.foreach(ResizableArray.scala:59)
> 00:30:04,580 INFO  - scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1422)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.
> handleTaskSetFailed(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
> 00:30:04,580 INFO  - org.apache.spark.util.EventLoop$$anon$1.run(
> EventLoop.scala:48)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.runJob(
> DAGScheduler.scala:628)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1918)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1931)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1944)
> 00:30:04,580 INFO  - org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.
> scala:1353)
> 00:30:04,580 INFO  - org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> 00:30:04,580 INFO  - or

Spark Job crash due to File Not found when shuffle intermittently

2017-07-21 Thread Martin Peng
Hi,

I have several Spark jobs including both batch job and Stream jobs to
process the system log and analyze them. We are using Kafka as the pipeline
to connect each jobs.

Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
the jobs(both batch or streaming) are thrown below exceptions
randomly(either after several hours run or just run in 20 mins). Can anyone
give me some suggestions about how to figure out the real root cause?
(Looks like google result is not very useful...)

Thanks,
Martin

00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
java.io.FileNotFoundException:
/mnt/mesos/work_dir/slaves/20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
(No such file or directory)
00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
00:30:04,510 WARN  - at
java.io.FileOutputStream.open(FileOutputStream.java:270)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:213)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:162)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.Task.run(Task.scala:99)
00:30:04,510 WARN  - at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)

00:30:04,580 INFO  - Driver stacktrace:
00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
00:30:04,580 INFO  -
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
00:30:04,580 INFO  -
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.take(RDD.scala:1326)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope

The stability of Spark Stream Kafka 010

2017-06-29 Thread Martin Peng
Hi,

We planned to upgrade our Spark Kafka library to 0.10 from 0.81 to simplify
our infrastructure code logic. Does anybody know when will the 010 version
become stable from experimental?
May I use this 010 version together with Spark 1.5.1?

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Thanks
Martin


stratified sampling scales poorly

2016-12-19 Thread Martin Le
Hi all,

I perform sampling on a DStream by taking samples from RDDs in the DStream.
I have used two sampling mechanisms: simple random sampling and stratified
sampling.

Simple random sampling: inputStream.transform(x => x.sample(false,
fraction)).

Stratified sampling: inputStream.transform(x => x.sampleByKeyExact(false,
fractions))

where fractions = Map(“key1”-> fraction,  “key2”-> fraction, …, “keyn”->
fraction).

I have a question is that why stratified sampling scales poorly with
different sampling fractions in this context? meanwhile simple random
sampling scales well with different sampling fractions (I ran experiments
on 4 nodes cluster )?

Thank you,

Martin


Re: How to use a custom filesystem provider?

2016-09-21 Thread Jean-Philippe Martin
>
> There's a bit of confusion setting in here; the FileSystem implementations
> spark uses are subclasses of org.apache.hadoop.fs.FileSystem; the nio
> class with the same name is different.
> grab the google cloud storage connector and put it on your classpath


I was using the gs:// filesystem as an example. I should have mentioned
that I'm aware of the workaround for that one.

I'm not asking how to read from Google Cloud Storage from Spark.

What I'm interested in is Java's built-in extension mechanism for its
"Path" objects, aka custom filesystem providers
.
What if I want to use my own different custom filesystem provider?
Something that allow me to take a funky-looking string like "foo://bar/baz"
and open it like a regular file, even though this results in a TCP
connection to the bar server and ask it to give me the "baz" file out of
its holographic quantum entangled storage (or other unspecified future
technology that can provide file-like objects).


How to use a custom filesystem provider?

2016-09-21 Thread Jean-Philippe Martin
The full source for my example is available on github
<https://github.com/jean-philippe-martin/SparkRepro>.

I'm using maven to depend on gcloud-java-nio
<https://mvnrepository.com/artifact/com.google.cloud/gcloud-java-nio/0.2.5>,
which provides a Java FileSystem for Google Cloud Storage, via "gs://"
URLs. My Spark project uses maven-shade-plugin to create one big jar with
all the source in it.

The big jar correctly includes a
META-INF/services/java.nio.file.spi.FileSystemProviderfile, containing the
correct name for the class (
com.google.cloud.storage.contrib.nio.CloudStorageFileSystemProvider). I
checked and that class is also correctly included in the jar file.

The program uses FileSystemProvider.installedProviders() to list the
filesystem providers it finds. "gs" should be listed (and it is if I run
the same function in a non-Spark context), but when running with Spark on
Dataproc, that provider's gone.

I'd like to know: *How can I use a custom filesystem in my Spark program*?
(asked earlier
<http://stackoverflow.com/questions/39500445/filesystem-provider-disappearing-in-spark>
in Stackoverflow but I didn't get any traction there)


DCOS - s3

2016-08-21 Thread Martin Somers
I having trouble loading data from an s3 repo
Currently DCOS is running spark 2 so I not sure if there is a modifcation
to code with the upgrade

my code atm looks like this


sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "xxx")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "xxx")

  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val fname = "s3n://somespark/datain.csv"
  // val rows = sc.textFile(fname).map { line =>
  // val values = line.split(',').map(_.toDouble)
  // Vectors.dense(values)
  // }


  val rows = sc.textFile(fname)
  rows.count()



the spark survice returns a failed message - but little information to
exactly why the job didnt run


any suggestions to what i an try?
-- 
M


Unsubscribe

2016-08-16 Thread Martin Serrano

Sent from my Verizon Wireless 4G LTE DROID


UNSUBSCRIBE

2016-08-10 Thread Martin Somers
-- 
M


Unsubscribe.

2016-08-09 Thread Martin Somers
Unsubscribe.

Thanks
M


Re: sampling operation for DStream

2016-08-01 Thread Martin Le
How to do that? if I put the queue inside .transform operation, it
doesn't work.

On Mon, Aug 1, 2016 at 6:43 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Can you keep a queue per executor in memory?
>
> On Mon, Aug 1, 2016 at 11:24 AM, Martin Le <martin.leq...@gmail.com>
> wrote:
> > Hi Cody and all,
> >
> > Thank you for your answer. I implement simple random sampling (SRS) for
> > DStream using transform method, and it works fine.
> > However, I have a problem when I implement reservoir sampling (RS). In
> RS, I
> > need to maintain a reservoir (a queue) to store selected data items
> (RDDs).
> > If I define a large stream window, the queue also increases  and it
> leads to
> > the driver run out of memory.  I explain my problem in detail here:
> >
> https://docs.google.com/document/d/1YBV_eHH6U_dVF1giiajuG4ayoVN5R8Qw3IthoKvW5ok
> >
> > Could you please give me some suggestions or advice to fix this problem?
> >
> > Thanks
> >
> > On Fri, Jul 29, 2016 at 6:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Most stream systems you're still going to incur the cost of reading
> >> each message... I suppose you could rotate among reading just the
> >> latest messages from a single partition of a Kafka topic if they were
> >> evenly balanced.
> >>
> >> But once you've read the messages, nothing's stopping you from
> >> filtering most of them out before doing further processing.  The
> >> dstream .transform method will let you do any filtering / sampling you
> >> could have done on an rdd.
> >>
> >> On Fri, Jul 29, 2016 at 9:57 AM, Martin Le <martin.leq...@gmail.com>
> >> wrote:
> >> > Hi all,
> >> >
> >> > I have to handle high-speed rate data stream. To reduce the heavy
> load,
> >> > I
> >> > want to use sampling techniques for each stream window. It means that
> I
> >> > want
> >> > to process a subset of data instead of whole window data. I saw Spark
> >> > support sampling operations for RDD, but for DStream, Spark supports
> >> > sampling operation as well? If not,  could you please give me a
> >> > suggestion
> >> > how to implement it?
> >> >
> >> > Thanks,
> >> > Martin
> >
> >
>


Re: sampling operation for DStream

2016-08-01 Thread Martin Le
Hi Cody and all,

Thank you for your answer. I implement simple random sampling (SRS) for
DStream using transform method, and it works fine.
However, I have a problem when I implement reservoir sampling (RS). In RS,
I need to maintain a reservoir (a queue) to store selected data items
(RDDs). If I define a large stream window, the queue also increases  and it
leads to the driver run out of memory.  I explain my problem in detail
here:
https://docs.google.com/document/d/1YBV_eHH6U_dVF1giiajuG4ayoVN5R8Qw3IthoKvW5ok

Could you please give me some suggestions or advice to fix this problem?

Thanks

On Fri, Jul 29, 2016 at 6:28 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Most stream systems you're still going to incur the cost of reading
> each message... I suppose you could rotate among reading just the
> latest messages from a single partition of a Kafka topic if they were
> evenly balanced.
>
> But once you've read the messages, nothing's stopping you from
> filtering most of them out before doing further processing.  The
> dstream .transform method will let you do any filtering / sampling you
> could have done on an rdd.
>
> On Fri, Jul 29, 2016 at 9:57 AM, Martin Le <martin.leq...@gmail.com>
> wrote:
> > Hi all,
> >
> > I have to handle high-speed rate data stream. To reduce the heavy load, I
> > want to use sampling techniques for each stream window. It means that I
> want
> > to process a subset of data instead of whole window data. I saw Spark
> > support sampling operations for RDD, but for DStream, Spark supports
> > sampling operation as well? If not,  could you please give me a
> suggestion
> > how to implement it?
> >
> > Thanks,
> > Martin
>


sampling operation for DStream

2016-07-29 Thread Martin Le
Hi all,

I have to handle high-speed rate data stream. To reduce the heavy load, I
want to use sampling techniques for each stream window. It means that I
want to process a subset of data instead of whole window data. I saw Spark
support sampling operations for RDD, but for DStream, Spark supports
sampling operation as well? If not,  could you please give me a suggestion
how to implement it?

Thanks,
Martin


Re: libraryDependencies

2016-07-26 Thread Martin Somers
cheers - I updated

libraryDependencies  ++= Seq(
  // other dependencies here
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-mllib_2.10" % "1.6.2",
  "org.scalanlp" %% "breeze" % "0.12",
  // native libraries are not included by default. add this if
you want them (as of 0.7)
  // native libraries greatly improve performance, but increase
jar sizes.
  "org.scalanlp" %% "breeze-natives" % "0.12",
)

and getting similar error

Compiling 1 Scala source to
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
[error] ^
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:3:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.SingularValueDecomposition
[error] ^
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:5:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.{Vector, Vectors}
[error] ^
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:8:
not found: object breeze

On Tue, Jul 26, 2016 at 8:36 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Also, you'll want all of the various spark versions to be the same.
>
> On Tue, Jul 26, 2016 at 12:34 PM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> If you are using %% (double) then you do not need _2.11.
>>
>> On Tue, Jul 26, 2016 at 12:18 PM, Martin Somers <sono...@gmail.com>
>> wrote:
>>
>>>
>>> my build file looks like
>>>
>>> libraryDependencies  ++= Seq(
>>>   // other dependencies here
>>>   "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
>>>   "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
>>>   "org.scalanlp" % "breeze_2.11" % "0.7",
>>>   // native libraries are not included by default. add this
>>> if you want them (as of 0.7)
>>>   // native libraries greatly improve performance, but
>>> increase jar sizes.
>>>   "org.scalanlp" % "breeze-natives_2.11" % "0.7",
>>> )
>>>
>>> not 100% sure on the version numbers if they are indeed correct
>>> getting an error of
>>>
>>> [info] Resolving jline#jline;2.12.1 ...
>>> [info] Done updating.
>>> [info] Compiling 1 Scala source to
>>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
>>> [error]
>>> /Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
>>> object mllib is not a member of package org.apache.spark
>>> [error] import org.apache.spark.mllib.linalg.distributed.RowMatrix
>>> 
>>> ...
>>>
>>>
>>> Im trying to import in
>>>
>>> import org.apache.spark.mllib.linalg.distributed.RowMatrix
>>> import org.apache.spark.mllib.linalg.SingularValueDecomposition
>>>
>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>
>>>
>>> import breeze.linalg._
>>> import breeze.linalg.{ Matrix => B_Matrix }
>>> import breeze.linalg.{ Vector => B_Matrix }
>>> import breeze.linalg.DenseMatrix
>>>
>>> object MyApp {
>>>   def main(args: Array[String]): Unit = {
>>> //code here
>>> }
>>>
>>>
>>> It might not be the correct way of doing this
>>>
>>> Anyone got any suggestion
>>> tks
>>> M
>>>
>>>
>>>
>>>
>>
>


-- 
M


libraryDependencies

2016-07-26 Thread Martin Somers
my build file looks like

libraryDependencies  ++= Seq(
  // other dependencies here
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-mllib_2.11" % "1.6.0",
  "org.scalanlp" % "breeze_2.11" % "0.7",
  // native libraries are not included by default. add this if
you want them (as of 0.7)
  // native libraries greatly improve performance, but increase
jar sizes.
  "org.scalanlp" % "breeze-natives_2.11" % "0.7",
)

not 100% sure on the version numbers if they are indeed correct
getting an error of

[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Compiling 1 Scala source to
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/target/scala-2.11/classes...
[error]
/Users/studio/.sbt/0.13/staging/42f93875138543b4e1d3/sparksample/src/main/scala/MyApp.scala:2:
object mllib is not a member of package org.apache.spark
[error] import org.apache.spark.mllib.linalg.distributed.RowMatrix

...


Im trying to import in

import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition

import org.apache.spark.mllib.linalg.{Vector, Vectors}


import breeze.linalg._
import breeze.linalg.{ Matrix => B_Matrix }
import breeze.linalg.{ Vector => B_Matrix }
import breeze.linalg.DenseMatrix

object MyApp {
  def main(args: Array[String]): Unit = {
//code here
}


It might not be the correct way of doing this

Anyone got any suggestion
tks
M


sbt build under scala

2016-07-26 Thread Martin Somers
Just wondering

Whats is the correct way of building a spark job using scala - are there
any changes coming with spark v2

Ive been following this post

http://www.infoobjects.com/spark-submit-with-sbt/



Then again Ive been mainly using docker locally what is decent container
for submitting these jobs locally

Im getting to a stage where I need to submit jobs remotely and thinking of
the best way of doing so


tks

M


SVD output within Spark

2016-07-21 Thread Martin Somers
just looking at a comparision between Matlab and Spark for svd with an
input matrix N


this is matlab code - yes very small matrix

N =

2.5903   -0.04160.6023
   -0.12362.55960.7629
0.0148   -0.06930.2490



U =

   -0.3706   -0.92840.0273
   -0.92870.37080.0014
   -0.0114   -0.0248   -0.9996


Spark code

// Breeze to spark
val N1D = N.reshape(1, 9).toArray


// Note I had to transpose array to get correct values with incorrect signs
val V2D = N1D.grouped(3).toArray.transpose


// Then convert the array into a RDD
// val NVecdis = Vectors.dense(N1D.map(x => x.toDouble))
// val V2D = N1D.grouped(3).toArray


val rowlocal = V2D.map{x => Vectors.dense(x)}
val rows = sc.parallelize(rowlocal)
val mat = new RowMatrix(rows)
val mat = new RowMatrix(rows)
val svd = mat.computeSVD(mat.numCols().toInt, computeU=true)



Spark Output - notice the change in sign on the 2nd and 3rd column
-0.3158590633523746   0.9220516369164243   -0.22372713505049768
-0.8822050381939436   -0.3721920780944116  -0.28842213436035985
-0.34920956843045253  0.10627246051309004  0.9309988407367168



And finally some julia code
N  = [2.59031-0.0416335  0.602295;
-0.1235842.559640.762906;
0.0148463  -0.0693119  0.249017]

svd(N, thin=true)   --- same as matlab
-0.315859  -0.922052   0.223727
-0.882205   0.372192   0.288422
-0.34921   -0.106272  -0.930999

Most likely its an issue with my implementation rather than being a bug
with svd within the spark environment
My spark instance is running locally with a docker container
Any suggestions
tks


Re: Spark streaming takes longer time to read json into dataframes

2016-07-16 Thread Martin Eden
Hi,

I would just do a repartition on the initial direct DStream since otherwise
each RDD in the stream has exactly as many partitions as you have
partitions in the Kafka topic (in your case 1). Like that receiving is
still done in only 1 thread but at least the processing further down is
done in parallel.

If you want to parallelize your receiving as well I would partition my
Kafka topic and then the RDDs in the initial DStream will have as many
partitions as you set in Kafka.

Have you seen this?
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

M

On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

>
> -- Forwarded message --
> From: Diwakar Dhanuskodi 
> Date: Sat, Jul 16, 2016 at 9:30 AM
> Subject: Re: Spark streaming takes longer time to read json into dataframes
> To: Jean Georges Perrin 
>
>
> Hello,
>
> I need it on memory.  Increased executor memory to 25G and executor cores
> to 3. Got same result. There is always one task running under executor for
> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
> inside foreachRDD is a good approach?
>
> Regards,
> Diwakar.
>
> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin  wrote:
>
>> Do you need it on disk or just push it to memory? Can you try to increase
>> memory or # of cores (I know it sounds basic)
>>
>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > I have 400K json messages pulled from Kafka into spark streaming using
>> DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>> single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>> convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>> dataframe.
>> >
>> > I am running in Yarn client mode with executor memory as 15G and
>> executor cores as 2.
>> >
>> > Caching rdd before converting into dataframe  doesn't change processing
>> time. Whether introducing hash partitions inside foreachRDD  will help?
>> (or) Will partitioning topic and have more than one DirectStream help?. How
>> can I approach this situation to reduce time in converting to dataframe..
>> >
>> > Regards,
>> > Diwakar.
>>
>>
>
>


SparkStreaming multiple output operations failure semantics / error propagation

2016-07-14 Thread Martin Eden
Hi,

I have a Spark 1.6.2 streaming job with multiple output operations (jobs)
doing idempotent changes in different repositories.

The problem is that I want to somehow pass errors from one output operation
to another such that  in the current output operation I only update
previously successful messages. This has to propagate all the way to the
last job which is supposed to only ACK the successfully processed messages
to the input queue, leaving the unsuccessful ones un-ACKED for later
processing.

The overall desired behaviour is best effort / fail fast, leaving the
messages which were not successfully processed by all output operations in
the input queue for retrying later.

Is there a pattern for achieving this in SparkStreaming?

If not can SparkStreaming at least guarantee that if the previous
job/output operation in the batch fails, it does not execute the next
jobs/output operations?

Thanks in advance,
M


Re: DataFrame versus Dataset creation and usage

2016-06-28 Thread Martin Serrano
Xinh,

Thanks for the clarification.  I'm new to Spark and trying to navigate the 
different APIs.  I was just following some examples and retrofitting them, but 
I see now I should stick with plain RDDs until my schema is known (at the end 
of the data pipeline).

Thanks again!

On 06/24/2016 04:57 PM, Xinh Huynh wrote:
Hi Martin,

Since your schema is dynamic, how would you use Datasets? Would you know ahead 
of time the row type T in a Dataset[T]?

One option is to start with DataFrames in the beginning of your data pipeline, 
figure out the field types, and then switch completely over to RDDs or Dataset 
in the next stage of the pipeline.

Also, I'm not sure what the custom Java mappers are doing - could you use them 
as UDFs within a DataFrame?

Xinh

On Fri, Jun 24, 2016 at 11:42 AM, Martin Serrano 
<mar...@attivio.com<mailto:mar...@attivio.com>> wrote:
Indeed.  But I'm dealing with 1.6 for now unfortunately.


On 06/24/2016 02:30 PM, Ted Yu wrote:
In Spark 2.0, Dataset and DataFrame are unified.

Would this simplify your use case ?

On Fri, Jun 24, 2016 at 7:27 AM, Martin Serrano 
<mar...@attivio.com<mailto:mar...@attivio.com>> wrote:
Hi,

I'm exposing a custom source to the Spark environment.  I have a question about 
the best way to approach this problem.

I created a custom relation for my source and it creates a DataFrame.  My 
custom source knows the data types which are dynamic so this seemed to be the 
appropriate return type.  This works fine.

The next step I want to take is to expose some custom mapping functions 
(written in Java).  But when I look at the APIs, the map method for DataFrame 
returns an RDD (not a DataFrame).  (Should I use SqlContext.createDataFrame on 
the result? -- does this result in additional processing overhead?)  The 
Dataset type seems to be more of what I'd be looking for, it's map method 
returns the Dataset type.  So chaining them together is a natural exercise.

But to create the Dataset from a DataFrame, it appears that I have to provide 
the types of each field in the Row in the DataFrame.as[...] method.  I would 
think that the DataFrame would be able to do this automatically since it has 
all the types already.

This leads me to wonder how I should be approaching this effort.  As all the 
fields and types are dynamic, I cannot use beans as my type when passing data 
around.  Any advice would be appreciated.

Thanks,
Martin









Re: DataFrame versus Dataset creation and usage

2016-06-24 Thread Martin Serrano
Indeed.  But I'm dealing with 1.6 for now unfortunately.

On 06/24/2016 02:30 PM, Ted Yu wrote:
In Spark 2.0, Dataset and DataFrame are unified.

Would this simplify your use case ?

On Fri, Jun 24, 2016 at 7:27 AM, Martin Serrano 
<mar...@attivio.com<mailto:mar...@attivio.com>> wrote:
Hi,

I'm exposing a custom source to the Spark environment.  I have a question about 
the best way to approach this problem.

I created a custom relation for my source and it creates a DataFrame.  My 
custom source knows the data types which are dynamic so this seemed to be the 
appropriate return type.  This works fine.

The next step I want to take is to expose some custom mapping functions 
(written in Java).  But when I look at the APIs, the map method for DataFrame 
returns an RDD (not a DataFrame).  (Should I use SqlContext.createDataFrame on 
the result? -- does this result in additional processing overhead?)  The 
Dataset type seems to be more of what I'd be looking for, it's map method 
returns the Dataset type.  So chaining them together is a natural exercise.

But to create the Dataset from a DataFrame, it appears that I have to provide 
the types of each field in the Row in the DataFrame.as[...] method.  I would 
think that the DataFrame would be able to do this automatically since it has 
all the types already.

This leads me to wonder how I should be approaching this effort.  As all the 
fields and types are dynamic, I cannot use beans as my type when passing data 
around.  Any advice would be appreciated.

Thanks,
Martin







DataFrame versus Dataset creation and usage

2016-06-24 Thread Martin Serrano
Hi,

I'm exposing a custom source to the Spark environment.  I have a question about 
the best way to approach this problem.

I created a custom relation for my source and it creates a DataFrame.  My 
custom source knows the data types which are dynamic so this seemed to be the 
appropriate return type.  This works fine.

The next step I want to take is to expose some custom mapping functions 
(written in Java).  But when I look at the APIs, the map method for DataFrame 
returns an RDD (not a DataFrame).  (Should I use SqlContext.createDataFrame on 
the result? -- does this result in additional processing overhead?)  The 
Dataset type seems to be more of what I'd be looking for, it's map method 
returns the Dataset type.  So chaining them together is a natural exercise.

But to create the Dataset from a DataFrame, it appears that I have to provide 
the types of each field in the Row in the DataFrame.as[...] method.  I would 
think that the DataFrame would be able to do this automatically since it has 
all the types already.

This leads me to wonder how I should be approaching this effort.  As all the 
fields and types are dynamic, I cannot use beans as my type when passing data 
around.  Any advice would be appreciated.

Thanks,
Martin





How does Spark Streaming updateStateByKey or mapWithState scale with state size?

2016-06-23 Thread Martin Eden
Hi all,

It is currently difficult to understand from the Spark docs or the
materials online that I came across, how the updateStateByKey
and mapWithState operators in Spark Streaming scale with the size of the
state and how to reason about sizing the cluster appropriately.

According to this article:
https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html
mapWithState can handle a lot more state than updateStateByKey but the
discussion there is in terms of number of keys without details about
cluster sizes. What about size in Gb?

I have ~100GB worth of state, not all of it updating all the time
obviously, will Spark be able to handle that with these operators? (none of
the state expires)

How big does the cluster have to be to handle this reliably and offer an
uninterrupted service (number of nodes, memory size per node etc)?

How can you deal with bootstrapping?

What about code upgrades?

Ideally I would like to keep my state in Spark as not to manage an external
data store for it. What is not clear to me is what is the size of state
where I have to move from keeping state in Spark to keeping it in an
external data store?

Thanks


S3n performance (@AaronDavidson)

2016-04-12 Thread Martin Eden
Hi everyone,

Running on EMR 4.3 with Spark 1.6.0 and the provided S3N native driver I
manage to process approx 1TB of strings inside gzipped parquet in about 50
mins on a 20 node cluster (8 cores, 60Gb ram). That's about 17MBytes/sec
per node.

This seems sub optimal.

The processing is very basic, simple fields extraction from the strings and
a groupBy.

Watching Aaron's talk from the Spark EU Summit:
https://youtu.be/GzG9RTRTFck?t=863

it seems I am hitting the same issues with suboptimal S3 throughput he
mentions there.

I tried different numbers of files for the input data set (more smaller
files vs less larger files) combined with various settings
for fs.s3n.block.size thinking that might help if each mapper streams
larger chunks. It didn't! It actually seems that many small files gives
better performance than less larger ones (of course with oversubscribed
number of tasks/threads).

Similarly to what Aaron is mentioning with oversubscribed tasks/threads we
also become CPU bound (reach 100% cpu utilisation).


Has anyone seen a similar behaviour? How can we optimise this?

Are the improvements mentioned in Aaron's talk now part of S3n or S3a
driver or are they just available under DataBricksCloud? How can we benefit
from those improvements?

Thanks,
Martin

P.S. Have not tried S3a.


Re: Direct Kafka input stream and window(…) function

2016-03-29 Thread Martin Soch

Hi Cody,

thanks for your answer. I have finally managed to create simple sample 
code. Here it is:


import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;

public class SparkTest {

private static JavaPairInputDStream<String, String> 
setupKafkaStream(final JavaStreamingContext jssc) {

final Map<String, String> params = new HashMap<>();
params.put("group.id", "TestApp");
params.put("metadata.broker.list", "localhost:9092");

final Set topics = new HashSet<>();
topics.add("test");

return KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
params,
topics
);
}

public static void main(String args[]) {
final SparkConf conf = new 
SparkConf().setMaster("local[*]").setAppName("SparkTest");
final JavaStreamingContext jssc = new 
JavaStreamingContext(conf, Durations.seconds(1));


// setup
// final JavaReceiverInputDStream lines = 
jssc.socketTextStream("localhost", );
final JavaPairInputDStream<String, String> s = 
setupKafkaStream(jssc);

final JavaDStream lines = s.map(tuple -> tuple._2());

final JavaDStream words =
lines.window(Durations.seconds(10)).flatMap(line -> 
Arrays.asList(line.split(" ")));



final JavaPairDStream<String, Integer> wordCounts =
words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);

wordCounts.print();

final ConsoleShutdownTrigger consoleShutdownTrigger = new 
ConsoleShutdownTrigger(() -> {

jssc.stop(false, true);
});
consoleShutdownTrigger.start();

jssc.start();
jssc.awaitTermination();
}
}

When I run this app the pipeline is stopped (or blocked). But if I 
switch from direct-kafka-stream to (for instance) socket-text-stream the 
app works as expected.


Since this is possible use-case (API allows it) I would like to know 
whether I hit some limitation (or bug) in the Spark-Kafka.


I am using Spark 1.5.0.

Thanks
Martin



On 03/22/2016 06:24 PM, Cody Koeninger wrote:

I definitely have direct stream jobs that use window() without
problems... Can you post a minimal code example that reproduces the
problem?

Using print() will confuse the issue, since print() will try to only
use the first partition.

Use foreachRDD { rdd => rdd.foreach(println)

or something comparable

On Tue, Mar 22, 2016 at 10:14 AM, Martin Soch <martin.s...@oracle.com> wrote:

Hi all,

I am using direct-Kafka-input-stream in my Spark app. When I use window(...)
function in the chain it will cause the processing pipeline to stop - when I
open the Spark-UI I can see that the streaming batches are being queued and
the pipeline reports to process one of the first batches.

To be more correct: the issue happens only when the windows overlap (if
sliding_interval < window_length). Otherwise the system behaves as expected.

Derivations of window(..) function - like reduceByKeyAndWindow(..), etc.
works also as expected - pipeline doesn't stop. The same applies when using
different type of stream.

Is it some known limitation of window(..) function when used with
direct-Kafka-input-stream ?

Java pseudo code:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
s.window(Durations.seconds(10)).print();  // the pipeline will stop

Thanks
Martin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Direct Kafka input stream and window(…) function

2016-03-22 Thread Martin Soch

Hi all,

I am using direct-Kafka-input-stream in my Spark app. When I use 
window(...) function in the chain it will cause the processing pipeline 
to stop - when I open the Spark-UI I can see that the streaming batches 
are being queued and the pipeline reports to process one of the first 
batches.


To be more correct: the issue happens only when the windows overlap (if 
sliding_interval < window_length). Otherwise the system behaves as expected.


Derivations of window(..) function - like reduceByKeyAndWindow(..), etc. 
works also as expected - pipeline doesn't stop. The same applies when 
using different type of stream.


Is it some known limitation of window(..) function when used with 
direct-Kafka-input-stream ?


Java pseudo code:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
s.window(Durations.seconds(10)).print();  // the pipeline will stop

Thanks
Martin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Connection failure followed by bad shuffle files during shuffle

2016-03-15 Thread Eric Martin
Hi,

I'm running into consistent failures during a shuffle read while trying to
do a group-by followed by a count aggregation (using the DataFrame API on
Spark 1.5.2).

The shuffle read (in stage 1) fails with

org.apache.spark.shuffle.FetchFailedException: Failed to send RPC
7719188499899260109 to host_a/ip_a:35946:
java.nio.channels.ClosedChannelException
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)


Looking into executor logs shows first shows

ERROR TransportChannelHandler: Connection to host_b/ip_b:38804 has been
quiet for 12 ms while there are outstanding requests. Assuming
connection is dead; please adjust spark.network.timeout if this is wrong.

on the node that threw the FetchFailedException (host_a) and

ERROR TransportRequestHandler: Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=207789700738,
chunkIndex=894},
buffer=FileSegmentManagedBuffer{file=/local_disk/spark-ed6667d4-445b-4d65-bfda-e4540b7215aa/executor-d03e5e7e-57d4-40e2-9021-c20d0b84bf75/blockmgr-05d5f2b6-142e-415c-a08b-58d16a10b8bf/27/shuffle_1_13732_0.data,
offset=18960736, length=19477}} to /ip_a:32991; closing connection

on the node referenced in the exception (host_b). The error in the host_b
logs occurred a few seconds after the error in the host_a logs. I noticed
there was a lot of spilling going on during the shuffle read, so I
attempted to work around this problem by increasing the number of shuffle
partitions (to decrease spilling) as well as increasing
spark.network.timeout. Neither of these got rid of these connection
failures.

This causes some of stage 0 to recompute (which runs successfully). Stage 1
retry 1 then always fails with

java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)

Changing the spark.io.compression.codec to lz4 changes this error to

java.io.IOException: Stream is corrupted
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:153)

which leads me to believe that the timeout during the shuffle read failure
leaves invalid files on disk.

Notably, these failures do not occur when I run on smaller subsets of data.
The failure is occurring while attempting to group ~100 billion rows into
20 billion groups (with key size of 24 bytes and count as the only
aggregation) on a 16 node cluster. I've replicated this failure on 2
completely separate clusters (both running with standalone cluster manager).

Does anyone have suggestions about how I could make this crash go away or
how I could try to make a smaller failing test case so the bug can be more
easily investigated?

Best,
Eric Martin


Problem running JavaDirectKafkaWordCount

2016-03-12 Thread Martin Andreoni
 

Hi, I'm starting with spark and I'm having some issues. 

When I run the 'jar' example of JavaDirectKafkaWordCount it works
perfectly. However, if I compile the code and when I submit it, I'm
having the following error: 

> ERROR ActorSystemImpl: Uncaught fatal error from thread 
> [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem 
> [sparkDriver]
> java.lang.NoSuchMethodError: 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.id()I

The program is running locally and the driver configuration is with 4g. 

Some help is always welcome, 

Thanks. 
-- 

-----

MARTIN ANDREONI 
 

Frustration over Spark and Jackson

2016-02-16 Thread Martin Skøtt
Hi,

I recently started experimenting with Spark Streaming for ingesting and
enriching content from a Kafka stream. Being new to Spark I expected a bit
of a learning curve, but not with something as simple a using JSON data!

I have a JAR with common classes used across a number of Java projects
which I would also like to use in my Spark projects, but it uses a version
of Jackson which is newer than the one packaged with Spark - I can't (and
won't) downgrade to the older version in Spark. Any suggestions on how to
solve this?

I have looked at using the shade plugin to rename my version of Jackson,
but that would require me to change my common code which I would like to
avoid.


-- 
Kind regards
Martin


Sorry, but Nabble and ML suck

2015-10-31 Thread Martin Senne
Having written a post on last Tuesday, I'm still not able to see my post
under nabble. And yeah, subscription to u...@apache.spark.org was
successful (rechecked a minute ago)

Even more, I have no way (and no confirmation) that my post was accepted,
rejected, whatever.

This is very L4M3 and so 80ies.

Any help appreciated. Thx!


Re: Sorry, but Nabble and ML suck

2015-10-31 Thread Martin Senne
Thanks Nicholas for clarifying.

Having said, it's not about blaming but about improving.

The fact that my post from Tuesday is not visible on nabble and that I
received no answer let's me doubt it got posted correctl. On the other hand
you can read my recent post.  just irritated.

Hope to see get things improved ...

Cheers, Martin
Am 31.10.2015 17:34 schrieb "Nicholas Chammas" <nicholas.cham...@gmail.com>:

> Nabble is an unofficial archive of this mailing list. I don't know who
> runs it, but it's not Apache. There are often delays between when things
> get posted to the list and updated on Nabble, and sometimes things never
> make it over for whatever reason.
>
> This mailing list is, I agree, very 1980s. Unfortunately, it's required by
> the Apache Software Foundation (ASF).
>
> There was a discussion earlier this year
> <https://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAOhmDzfL2COdysV8r5hZN8f=NqXM=f=oy5no2dhwj_kveop...@mail.gmail.com%3E>
>  about
> migrating to Discourse that explained why we're stuck with what we have for
> now. Ironically, that discussion is hard to follow on the Apache archives
> (which is precisely one of the motivations for proposing to migrate to
> Discourse), but there is a more readable archive on another unofficial
> site
> <http://search-hadoop.com/m/q3RTtzu5vu1tD3w52=Discourse+A+proposed+alternative+to+the+Spark+User+list>
> .
>
> Nick
>
> On Sat, Oct 31, 2015 at 12:20 PM Martin Senne <martin.se...@googlemail.com>
> wrote:
>
>> Having written a post on last Tuesday, I'm still not able to see my post
>> under nabble. And yeah, subscription to u...@apache.spark.org was
>> successful (rechecked a minute ago)
>>
>> Even more, I have no way (and no confirmation) that my post was accepted,
>> rejected, whatever.
>>
>> This is very L4M3 and so 80ies.
>>
>> Any help appreciated. Thx!
>>
>


Re: Sorry, but Nabble and ML suck

2015-10-31 Thread Martin Senne
Ted, thx. Should I repost?
Am 31.10.2015 17:41 schrieb "Ted Yu" <yuzhih...@gmail.com>:

> From the result of http://search-hadoop.com/?q=spark+Martin+Senne ,
> Martin's post Tuesday didn't go through.
>
> FYI
>
> On Sat, Oct 31, 2015 at 9:34 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Nabble is an unofficial archive of this mailing list. I don't know who
>> runs it, but it's not Apache. There are often delays between when things
>> get posted to the list and updated on Nabble, and sometimes things never
>> make it over for whatever reason.
>>
>> This mailing list is, I agree, very 1980s. Unfortunately, it's required
>> by the Apache Software Foundation (ASF).
>>
>> There was a discussion earlier this year
>> <https://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3CCAOhmDzfL2COdysV8r5hZN8f=NqXM=f=oy5no2dhwj_kveop...@mail.gmail.com%3E>
>>  about
>> migrating to Discourse that explained why we're stuck with what we have for
>> now. Ironically, that discussion is hard to follow on the Apache archives
>> (which is precisely one of the motivations for proposing to migrate to
>> Discourse), but there is a more readable archive on another unofficial
>> site
>> <http://search-hadoop.com/m/q3RTtzu5vu1tD3w52=Discourse+A+proposed+alternative+to+the+Spark+User+list>
>> .
>>
>> Nick
>>
>> On Sat, Oct 31, 2015 at 12:20 PM Martin Senne <
>> martin.se...@googlemail.com> wrote:
>>
>>> Having written a post on last Tuesday, I'm still not able to see my post
>>> under nabble. And yeah, subscription to u...@apache.spark.org was
>>> successful (rechecked a minute ago)
>>>
>>> Even more, I have no way (and no confirmation) that my post was
>>> accepted, rejected, whatever.
>>>
>>> This is very L4M3 and so 80ies.
>>>
>>> Any help appreciated. Thx!
>>>
>>
>


Why does predicate pushdown not work on HiveContext (concrete HiveThriftServer2) ?

2015-10-31 Thread Martin Senne
Hi all,

# Programm Sketch

I create a HiveContext `hiveContext`
With that context, I create a DataFrame `df` from a JDBC relational table.I
register the DataFrame `df` viadf.registerTempTable("TESTTABLE")I start a
HiveThriftServer2 via
HiveThriftServer2.startWithContext(hiveContext)

The TESTTABLE contains 1,000,000 entries, columns are ID (INT) and NAME
(VARCHAR)

+-++
| ID  |  NAME  |
+-++
| 1   | Hello  |
| 2   | Hello  |
| 3   | Hello  |
| ... | ...|

With Beeline I access the SQL Endpoint (at port 1) of the
HiveThriftServer and perform a query. E.g.

SELECT * FROM TESTTABLE WHERE ID='3'

When I inspect the QueryLog of the DB with the SQL Statements executed I see

/*SQL #:100 t:657*/  SELECT \"ID\",\"NAME\" FROM test;

So there happens no predicate pushdown , as the where clause is missing.

# Questions

This gives raise to the following questions:

Why is no predicate pushdown performed?Can this be changed by not using
registerTempTable? If so, how?
Or is this a known restriction of the HiveThriftServer?

# Counterexample

If I create a DataFrame `df` in Spark SQLContext and call

df.filter( df("ID") === 3).show()
I observe

/*SQL #:1*/SELECT \"ID\",\"NAME\" FROM test WHERE ID = 3;

as expected.


Why is no predicate pushdown performed, when using Hive (HiveThriftServer2) ?

2015-10-28 Thread Martin Senne
Hi all,

# Programm Sketch


   1. I create a HiveContext `hiveContext`
   2. With that context, I create a DataFrame `df` from a JDBC relational
   table.
   3. I register the DataFrame `df` via

   df.registerTempTable("TESTTABLE")

   4. I start a HiveThriftServer2 via

   HiveThriftServer2.startWithContext(hiveContext)



The TESTTABLE contains 1,000,000 entries, columns are ID (INT) and NAME
(VARCHAR)

+-++
| ID  |  NAME  |
+-++
| 1   | Hello  |
| 2   | Hello  |
| 3   | Hello  |
| ... | ...|

With Beeline I access the SQL Endpoint (at port 1) of the
HiveThriftServer and perform a query. E.g.

SELECT * FROM TESTTABLE WHERE ID='3'

When I inspect the QueryLog of the DB with the SQL Statements executed I see

/*SQL #:100 t:657*/  *SELECT \"ID\",\"NAME\" FROM test;*

So there happens no predicate pushdown , as the where clause is missing.


# Questions

This gives raise to the following questions:

   1. *Why is no predicate pushdown performed?*
   2.
*Can this be changed by not using registerTempTable? If so, how? *
   3. *Or is this a known restriction of the HiveThriftServer?*


# Counterexample

If I create a DataFrame `df` in Spark SQLContext and call

df.filter( df("ID") === 3).show()

I observe

/*SQL #:1*/SELECT \"ID\",\"NAME\" FROM test *WHERE ID = 3*;

as expected.


Re: how to handle OOMError from groupByKey

2015-09-28 Thread Fabien Martin
You can try to reduce the number of containers in order to increase their
memory.

2015-09-28 9:35 GMT+02:00 Akhil Das :

> You can try to increase the number of partitions to get ride of the OOM
> errors. Also try to use reduceByKey instead of groupByKey.
>
> Thanks
> Best Regards
>
> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran 
> wrote:
>
>> Hi everyone,
>> I have an RDD of the format (user: String, timestamp: Long, state:
>> Boolean).  My task invovles converting the states, where on/off is
>> represented as true/false, into intervals of 'on' of the format (beginTs:
>> Long, endTs: Long).  So this task requires me, per user, to line up all of
>> the on/off states so that I can compute when it is on, since the
>> calculation is neither associative nor commutative.
>>
>> So there are 2 main operations that I'm trying to accomplish together:
>> 1. group by each user
>> 2. sort by time -- keep all of the states in sorted order by time
>>
>> The main code inside the method that does grouping by user and sorting by
>> time looks sort of looks like this:
>>
>>
>> // RDD starts off in format (user, ts, state) of type RDD[(String, Long,
>> Boolean)]
>> val grouped = keyedStatesRDD.groupByKey
>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of
>> type RDD[(String, Iterable(Long, Boolean))]
>> // take the sequence of (ts, state) per user, sort, get intervals
>> val groupedIntervals = grouped.mapValues(
>>   states => {
>> val sortedStates = states.toSeq.sortBy(_._1)
>> val intervals = DFUtil.statesToIntervals(sortedStates)
>> val intervalsList = bucketDurations.map{case(k,v) =>
>> (k,v)}(collection.breakOut).sortBy(_._1)
>> intervalsList
>>   }
>> )
>> // after .mapValues, new format for RDD is (user, seq-of-(startTime,
>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))]
>>
>>
>> When I run my Spark job with 1 day's worth of data, the job completes
>> successfully.  When I run with 1 month's or 1 year's worth of data, that
>> method is where my Spark job consistently crashes with get
>> OutOfMemoryErrors.  I need to run on the full year's worth of data.
>>
>> My suspicion is that the groupByKey is the problem (it's pulling all of
>> the matching data values into a single executor's heap as a plain Scala
>> Iterable).  But alternatives of doing sortByKey on the RDD first before
>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
>> quite apply in my scenario because my operation is not associative (can't
>> combine per-partition results) and I still need to group by users before
>> doing a foldLeft.
>>
>> I've definitely thought about the issue before and come across users with
>> issues that are similar but not exactly the same:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html
>>
>> And this Jira seems relevant too:
>> https://issues.apache.org/jira/browse/SPARK-3655
>>
>> The amount of memory that I'm using is 2g per executor, and I can't go
>> higher than that because each executor gets a YARN container from nodes
>> with 16 GB of RAM and 5 YARN containers allowed per node.
>>
>> So I'd like to know if there's an easy solution to executing my logic on
>> my full dataset in Spark.
>>
>> Thanks!
>>
>> -- Elango
>>
>
>


Re: Small File to HDFS

2015-09-03 Thread Martin Menzel
Hello Nicolas,

I solved a similar problem using FSDataOutputStream

http://blog.woopi.org/wordpress/files/hadoop-2.6.0-javadoc/org/apache/hadoop/fs/FSDataOutputStream.html

Each entry can be a ArrayWritable

http://blog.woopi.org/wordpress/files/hadoop-2.6.0-javadoc/org/apache/hadoop/io/ArrayWritable.html

so that you can put a arbitrary sequence of Writable subtypes into one
entry/element.

During my tests, best performance results could be achieved when using
binary ZLIB library. GZIP or bzip2 delivered slightly smaller files bute
the processing time was several TIMES slower in my cases.

Regarding update and delete:

As far as I know HDFS does not support update and delete. Tools like HBase
realize this by using several HDFS files and rewriting them from time to
time. Depending on the frequence you need to update / delete data, you can
think about housekeeping your HDFS file by yourself. You delete an entry by
writing a delete flag somewhere else refering to a key of your single data
entry. If you have to much deleted and you want really to cleanup the HDFS
file you can rewrite the HDFS file using MAPREDUCE for example.

In the update case you can use a similar technique. Just append the new /
updated version of your dataset and write a delete flag to the old version
somewhere else.

As a summary you should think about the complexity of your own hand my
solution in comparison with a HBase solution (like mentioned before). If
you don't like key value store databases you can also have a look to
phoenix on top of HBase which delivers a very SQL like access layer.

The only restriction I know for HBase is, that a single dataset should not
be bigger than the size of HDFS blocks.

I hope the comments help you. If you have questions don't hestitate to
cantact me.

Good luck

Martin



2015-09-03 16:17 GMT+02:00 <nib...@free.fr>:

> My main question in case of HAR usage is , is it possible to use Pig on it
> and what about performances ?
>
> - Mail original -
> De: "Jörn Franke" <jornfra...@gmail.com>
> À: nib...@free.fr, user@spark.apache.org
> Envoyé: Jeudi 3 Septembre 2015 15:54:42
> Objet: Re: Small File to HDFS
>
>
>
>
> Store them as hadoop archive (har)
>
>
> Le mer. 2 sept. 2015 à 18:07, < nib...@free.fr > a écrit :
>
>
> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


When will window ....

2015-08-10 Thread Martin Senne

When will window functions be integrated into Spark (without HiveContext?)

Gesendet mit AquaMail für Android
http://www.aqua-mail.com


Am 10. August 2015 23:04:22 schrieb Michael Armbrust mich...@databricks.com:


You will need to use a HiveContext for window functions to work.

On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.com wrote:

 Hello,

 Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries
 to a data frame and I'm trying to figure out if I just have a bad setup or
 if this is a bug. As for the exceptions I get: when using selectExpr() with
 a string as an argument, I get NoSuchElementException: key not found: lag
 and when using the select method and ...spark.sql.functions.lag I get an
 AnalysisException. If I replace lag with abs in the first case, Spark runs
 without exception, so none of the other syntax is incorrect.

 As for how I'm running it; the code is written in Java with a static
 method that takes the SparkContext as an argument which is used to create a
 JavaSparkContext which then is used to create an SQLContext which loads a
 json file from the local disk and runs those queries on that data frame
 object. FYI: the java code is compiled, jared and then pointed to with -cp
 when starting the spark shell, so all I do is Test.run(sc) in shell.

 Let me know what to look for to debug this problem. I'm not sure where to
 look to solve this problem.

 Thanks,
 Jerry



Re: Spark SQL DataFrame: Nullable column and filtering

2015-08-01 Thread Martin Senne
Dear all,

after some fiddling I have arrived at this solution:

/**
 * Customized left outer join on common column.
 */
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
  val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), leftouter)

  import joinedDF.sqlContext.implicits._
  val leftColumns = leftDF.columns
.map((cn: String) = $left.$cn)
  val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName))
.map((cn: String) = $right.$cn)

  joinedDF.select( leftColumns ++ rightColumns: _*)
}

Comments welcome

Alternatives I tried:

   - Not Working: If at least the right alias for rightDF is present, one
   could try

   joinedDF.drop(right. + columnname)

   but his does not work (no column is dropped).
   Unfortunately, drop does not support arguments of type Column /
   ColumnNames. *@Michael: Should I create a feature request in Jira for
   drop supporting Columns?*

   -

   Working: Without using aliases via as(...), but using column
renaming instead:

   rightDF.withColumnRenamed( communColumnName, right_ +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
   leftDF(commonColumnName) === rightDF(right_ + commonColumnName)

   In my opinion not so neat. Opinions?


Things I observed:

   - Column handling does not seem consistent
  - select() supports alias, while drop( ... ) only supports
  strings.
  - DataFrame.apply(  ) and DataFrame.col do also not support alias.
  - Thus the only way to handly ambiguous columnNames is via select at
  the moment. Can someone please confirm this!
  - Alias information is not displayed via DataFrame.printSchema. (or
   at least I did not find a way of how to)

Cheers,

Martin

2015-07-31 22:51 GMT+02:00 Martin Senne martin.se...@googlemail.com:

 Dear Michael, dear all,

 a minimal example is listed below.

 After some further analysis I could figure out, that the problem is
 related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
 columns of the left and right dataframes when doing the select on the
 joined table.

   /**
* Customized left outer join on common column.
*/
   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
 DataFrame, commonColumnName: String): DataFrame = {

 val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
 val rightColumns = rightDF.columns.filterNot(cn = 
 cn.equals(commonColumnName)).map(cn = rightDF(cn))

 leftDF.join(rightDF, leftDF(commonColumnName) === 
 rightDF(commonColumnName), leftouter)
   .select(leftColumns ++ rightColumns: _*)
   }

 As the column y of the right table has nullable=false, this is then also 
 transferred to the joined-Table y-Column, as I use rightDF( y ).

 Thus, I need to use columns of the joined table for the select.

 *Question now: The joined table has column names x, a, x, y. How do I 
 discard the second x column?*

 All my approaches failed (assuming here, that joinedDF is the joined 
 DataFrame.


- Using joinedDFdrop( x ) discards both x columns.
- Using joinedDF(x) does not work as it is ambigious
- Also using rightDF.as( aliasname)  in order to differentiate the
column x (from left DataFrame) with x (from right DataFrame) did not
work out, as I found no way as use select( $aliasname.x) really
programmatically. Could someone sketch the code?

 Any help welcome, thanks


 Martin



 
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.sql.{DataFrame, SQLContext}

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 object MinimalShowcase {

   /**
* Customized left outer join on common column.
*/
   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
 DataFrame, commonColumnName: String): DataFrame = {

 val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
 val rightColumns = rightDF.columns.filterNot(cn = 
 cn.equals(commonColumnName)).map(cn = rightDF(cn))

 leftDF.join(rightDF, leftDF(commonColumnName) === 
 rightDF(commonColumnName), leftouter)
   .select(leftColumns ++ rightColumns: _*)
   }


   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either 
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) 
 : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t

Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-31 Thread Martin Senne
Dear Michael, dear all,

a minimal example is listed below.

After some further analysis I could figure out, that the problem is related
to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of
the left and right dataframes when doing the select on the joined table.

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName)).map(cn = rightDF(cn))

leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), leftouter)
  .select(leftColumns ++ rightColumns: _*)
  }

As the column y of the right table has nullable=false, this is then
also transferred to the joined-Table y-Column, as I use rightDF( y
).

Thus, I need to use columns of the joined table for the select.

*Question now: The joined table has column names x, a, x, y.
How do I discard the second x column?*

All my approaches failed (assuming here, that joinedDF is the joined DataFrame.


   - Using joinedDFdrop( x ) discards both x columns.
   - Using joinedDF(x) does not work as it is ambigious
   - Also using rightDF.as( aliasname)  in order to differentiate the
   column x (from left DataFrame) with x (from right DataFrame) did not
   work out, as I found no way as use select( $aliasname.x) really
   programmatically. Could someone sketch the code?

Any help welcome, thanks


Martin




import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SQLContext}

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, hello), Record(2, bob))
  val mappings = Seq( Mapping(2, 5) )
}

object MinimalShowcase {

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName)).map(cn = rightDF(cn))

leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), leftouter)
  .select(leftColumns ++ rightColumns: _*)
  }


  /**
   * Set, if a column is nullable.
   * @param df source DataFrame
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is
either nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

val schema = df.schema
val newSchema = StructType(schema.map {
  case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
t, nullable = nullable, m)
  case y: StructField = y
})
df.sqlContext.createDataFrame( df.rdd, newSchema)
  }


  def main (args: Array[String]) {
val conf = new SparkConf()
  .setAppName(Minimal)
  .setMaster(local[*])

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
val mappingWithNullDF = setNullableStateOfColumn(mappingDF, y, true)

val joinedDF = recordDF.join(mappingDF, recordDF(x) ===
mappingDF(x), leftouter)
println(joinedDF:)
joinedDF.show
joinedDF.printSchema
joinedDF.filter(joinedDF(y).isNotNull).show

//joinedDF:
//+-+-+++
//|x|a|   x|   y|
//+-+-+++
//|1|hello|null|null|
//|2|  bob|   2|   5|
//+-+-+++
//
//root
//|-- x: integer (nullable = false)
//|-- a: string (nullable = true)
//|-- x: integer (nullable = true)
//|-- y: integer (nullable = true)
//
//+-+---+-+-+
//|x|  a|x|y|
//+-+---+-+-+
//|2|bob|2|5|
//+-+---+-+-+


val extrajoinedDF =
leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, x)
println(extrajoinedDF:)
extrajoinedDF.show
extrajoinedDF.printSchema
extrajoinedDF.filter(extrajoinedDF(y).isNotNull).show

//extrajoinedDF:
//+-+-++
//|x|a|   y|
//+-+-++
//|1|hello|null|
//|2|  bob|   5|
//+-+-++
//
//root
//|-- x: integer (nullable = false)
//|-- a: string (nullable = true)
//|-- y: integer (nullable = false)
//
//+-+-++
//|x|a|   y|
//+-+-++
//|1|hello|null|
//|2|  bob|   5|
//+-+-++



val joined2DF = recordDF.join(mappingWithNullDF, recordDF(x) ===
mappingWithNullDF(x), leftouter)
println(joined2DF

Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Martin Senne
Dear Michael, dear all,

distinguishing those records that have a match in mapping from those that
don't is the crucial point.

Record(x : Int,  a: String)
Mapping(x: Int, y: Int)

Thus

Record(1, hello)
Record(2, bob)
Mapping(2, 5)

yield (2, bob, 5) on an inner join.
BUT I'm also interested in (1, hello, null) as there is no counterpart in
mapping (this is the left outer join part)

I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
updates (case 2, bon).

Cheers and thanks,

Martin
Am 30.07.2015 22:58 schrieb Michael Armbrust mich...@databricks.com:

 Perhaps I'm missing what you are trying to accomplish, but if you'd like
to avoid the null values do an inner join instead of an outer join.

 Additionally, I'm confused about how the result
of joinedDF.filter(joinedDF(y).isNotNull).show still contains null values
in the column y. This doesn't really have anything to do with nullable,
which is only a hint to the system so that we can avoid null checking when
we know that there are no null values. If you provide the full code i can
try and see if this is a bug.

 On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne 
martin.se...@googlemail.com wrote:

 Dear Michael, dear all,

 motivation:

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 Now I want to perform an left outer join on records and mappings (with
the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) 
shorthand is in leftOuterJoinWithRemovalOfEqualColumn

 val sqlContext = new SQLContext(sc)
 // used to implicitly convert an RDD to a DataFrame.
 import sqlContext.implicits._

 val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
 val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

 val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
mappingDF, x)

 joinedDF.filter(joinedDF(y).isNotNull).show


 Currently, the output is


+-+-++

 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 instead of


+-+---+-+

 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+

 The last output can be achieved by the method of changing nullable=false
to nullable=true described in my first post.

 Thus, I need this schema modification as to make outer joins work.

 Cheers and thanks,

 Martin



 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to
filter all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column
(according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a
non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at
Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr

Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Martin Senne
Dear Michael, dear all,

motivation:

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, hello), Record(2, bob))
  val mappings = Seq( Mapping(2, 5) )
}

Now I want to perform an *left outer join* on records and mappings
(with the ON JOIN criterion on columns (recordDF(x) ===
mappingDF(x)  shorthand is in
*leftOuterJoinWithRemovalOfEqualColumn*

val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x)

joinedDF.filter(joinedDF(y).isNotNull).show


Currently, the output is

+-+-++

|x|a|   y|
+-+-++
|1|hello|null|
|2|  bob|   5|
+-+-++

instead of

+-+---+-+

|x|  a|y|
+-+---+-+
|2|bob|5|
+-+---+-+

The last output can be achieved by the method of changing nullable=false to
nullable=true described in my first post.

*Thus, I need this schema modification as to make outer joins work.*

Cheers and thanks,

Martin



2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
 don't actually leverage this information in many places yet.  Why do you
 want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
 martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark ML Pipeline inaccessible types

2015-03-25 Thread zapletal-martin
Thanks Peter,



I ended up doing something similar. I however consider both the approaches 
you mentioned bad practices which is why I was looking for a solution 
directly supported by the current code.




I can work with that now, but it does not seem to be the proper solution.




Regards,

Martin





-- Původní zpráva --
Od: Peter Rudenko petro.rude...@gmail.com
Komu: zapletal-mar...@email.cz, Sean Owen so...@cloudera.com
Datum: 25. 3. 2015 13:28:38
Předmět: Re: Spark ML Pipeline inaccessible types




 Hi Martin, here’s 2 possibilities to overcome this:

 1) Put your logic into org.apache.spark package in your project - then 
 everything would be accessible.
 2) Dirty trick:

 spanspanobject/span spanSparkVector/span 
spanspanextends/span/span spanHashingTF/span {/span
  spanspanval/span spanVectorUDT/span:/span spanDataType/span = 
outputDataType
}


 then you can do like this:

 spanStructType/span(spanvectorTypeColumn/span, 
spanSparkVector/span.spanVectorUDT/span, spanfalse/span))


 Thanks,
 Peter Rudenko

 On 2015-03-25 13:14, zapletal-mar...@email.cz
 (mailto:zapletal-mar...@email.cz) wrote:

 



Sean, 



thanks for your response. I am familiar with NoSuchMethodException in 
general, but I think it is not the case this time. The code actually 
attempts to get parameter by name using val m = this.getClass.getMethodName
(paramName).




This may be a bug, but it is only a side effect caused by the real problem I
am facing. My issue is that VectorUDT is not accessible by user code and 
therefore it is not possible to use custom ML pipeline with the existing 
Predictors (see the last two paragraphs in my first email).




Best Regards,

Martin



-- Původní zpráva --
Od: Sean Owen so...@cloudera.com(mailto:so...@cloudera.com)
Komu: zapletal-mar...@email.cz(mailto:zapletal-mar...@email.cz)
Datum: 25. 3. 2015 11:05:54
Předmět: Re: Spark ML Pipeline inaccessible types

NoSuchMethodError in general means that your runtime and compile-time
environments are different. I think you need to first make sure you
don't have mismatching versions of Spark.

On Wed, Mar 25, 2015 at 11:00 AM, zapletal-mar...@email.cz
(mailto:zapletal-mar...@email.cz) wrote:
 Hi,

 I have started implementing a machine learning pipeline using Spark 1.3.0
 and the new pipelining API and DataFrames. I got to a point where I have 
my
 training data set prepared using a sequence of Transformers, but I am
 struggling to actually train a model and use it for predictions.

 I am getting a java.lang.NoSuchMethodException:
 org.apache.spark.ml.regression.LinearRegression.myFeaturesColumnName()
 exception thrown at checkInputColumn method in Params trait when using a
 Predictor (LinearRegression in my case, but that should not matter). This
 looks like a bug - the exception is thrown when executing getParam
(colName)
 when the require(actualDataType.equals(datatype), ...) requirement is not
 met so the expected requirement failed exception is not thrown and is 
hidden
 by the unexpected NoSuchMethodException instead. I can raise a bug if this
 really is an issue and I am not using something incorrectly.

 The problem I am facing however is that the Predictor expects features to
 have VectorUDT type as defined in Predictor class (protected def
 featuresDataType: DataType = new VectorUDT). But since this type is
 private[spark] my Transformer can not prepare features with this type 
which
 then correctly results in the exception above when I use a different type.

 Is there a way to define a custom Pipeline that would be able to use the
 existing Predictors without having to bypass the access modifiers or
 reimplement something or is the pipelining API not yet expected to be used
 in this way?

 Thanks,
 Martin



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
(mailto:user-unsubscr...@spark.apache.org)
For additional commands, e-mail: user-h...@spark.apache.org
(mailto:user-h...@spark.apache.org) 
 



 

​





Re: Spark ML Pipeline inaccessible types

2015-03-25 Thread zapletal-martin
Sean,



thanks for your response. I am familiar with NoSuchMethodException in 
general, but I think it is not the case this time. The code actually 
attempts to get parameter by name using val m = this.getClass.getMethodName
(paramName).




This may be a bug, but it is only a side effect caused by the real problem I
am facing. My issue is that VectorUDT is not accessible by user code and 
therefore it is not possible to use custom ML pipeline with the existing 
Predictors (see the last two paragraphs in my first email).




Best Regards,

Martin



-- Původní zpráva --
Od: Sean Owen so...@cloudera.com
Komu: zapletal-mar...@email.cz
Datum: 25. 3. 2015 11:05:54
Předmět: Re: Spark ML Pipeline inaccessible types

NoSuchMethodError in general means that your runtime and compile-time
environments are different. I think you need to first make sure you
don't have mismatching versions of Spark.

On Wed, Mar 25, 2015 at 11:00 AM, zapletal-mar...@email.cz wrote:
 Hi,

 I have started implementing a machine learning pipeline using Spark 1.3.0
 and the new pipelining API and DataFrames. I got to a point where I have 
my
 training data set prepared using a sequence of Transformers, but I am
 struggling to actually train a model and use it for predictions.

 I am getting a java.lang.NoSuchMethodException:
 org.apache.spark.ml.regression.LinearRegression.myFeaturesColumnName()
 exception thrown at checkInputColumn method in Params trait when using a
 Predictor (LinearRegression in my case, but that should not matter). This
 looks like a bug - the exception is thrown when executing getParam
(colName)
 when the require(actualDataType.equals(datatype), ...) requirement is not
 met so the expected requirement failed exception is not thrown and is 
hidden
 by the unexpected NoSuchMethodException instead. I can raise a bug if this
 really is an issue and I am not using something incorrectly.

 The problem I am facing however is that the Predictor expects features to
 have VectorUDT type as defined in Predictor class (protected def
 featuresDataType: DataType = new VectorUDT). But since this type is
 private[spark] my Transformer can not prepare features with this type 
which
 then correctly results in the exception above when I use a different type.

 Is there a way to define a custom Pipeline that would be able to use the
 existing Predictors without having to bypass the access modifiers or
 reimplement something or is the pipelining API not yet expected to be used
 in this way?

 Thanks,
 Martin



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Spark ML Pipeline inaccessible types

2015-03-25 Thread zapletal-martin
Hi,



I have started implementing a machine learning pipeline using Spark 1.3.0 
and the new pipelining API and DataFrames. I got to a point where I have my 
training data set prepared using a sequence of Transformers, but I am 
struggling to actually train a model and use it for predictions.




I am getting a java.lang.NoSuchMethodException: org.apache.spark.ml.
regression.LinearRegression.myFeaturesColumnName() exception thrown at 
checkInputColumn method in Params trait when using a Predictor (
LinearRegression in my case, but that should not matter). This looks like a 
bug - the exception is thrown when executing getParam(colName) when the 
require(actualDataType.equals(datatype), ...) requirement is not met so the 
expected requirement failed exception is not thrown and is hidden by the 
unexpected NoSuchMethodException instead. I can raise a bug if this really 
is an issue and I am not using something incorrectly.




The problem I am facing however is that the Predictor expects features to 
have VectorUDT type as defined in Predictor class (protected def 
featuresDataType: DataType = new VectorUDT). But since this type is private
[spark] my Transformer can not prepare features with this type which then 
correctly results in the exception above when I use a different type.




Is there a way to define a custom Pipeline that would be able to use the 
existing Predictors without having to bypass the access modifiers or 
reimplement something or is the pipelining API not yet expected to be used 
in this way?




Thanks,

Martin








Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Martin Goodson
Have you tried to repartition() your original data to make more partitions
before you aggregate?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]

On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas johngou...@gmail.com
wrote:

 Hi Yin,

 Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
 without any success.
 I cannot figure out how to increase the number of mapPartitions tasks.

 Thanks a lot

 On 20 March 2015 at 18:44, Yin Huai yh...@databricks.com wrote:

 spark.sql.shuffle.partitions only control the number of tasks in the
 second stage (the number of reducers). For your case, I'd say that the
 number of tasks in the first state (number of mappers) will be the number
 of files you have.

 Actually, have you changed spark.executor.memory (it controls the
 memory for an executor of your application)? I did not see it in your
 original email. The difference between worker memory and executor memory
 can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
 ),

 SPARK_WORKER_MEMORY
 Total amount of memory to allow Spark applications to use on the machine,
 e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
 application's individual memory is configured using its
 spark.executor.memory property.


 On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Actually I realized that the correct way is:

 sqlContext.sql(set spark.sql.shuffle.partitions=1000)

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get
 again the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and
 the default is 200. Maybe there are too many distinct values and the 
 memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB
 each. The number of tasks launched is equal to the number of parquet 
 files.
 Do you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com
 wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on
 a parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
 in a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot













Solving linear equations

2014-10-22 Thread Martin Enzinger
Hi,

I'm wondering how to use Mllib for solving equation systems following this
pattern

2*x1 + x2 + 3*x3 +  + xn = 0
x1 + 0*x2 + 3*x3 +  + xn = 0
..
..
0*x1 + x2 + 0*x3 +  + xn = 0

I definitely still have some reading to do to really understand the direct
solving techniques, but at the current state of knowledge SVD could help
me with this right?

Can you point me to an example or a tutorial?

best regards


Re: Personalized Page rank in graphx

2014-08-21 Thread Martin Liesenberg
I could take a stab at it, though I'd have some reading up on
Personalized PageRank to do, before I'd be able to start coding. If
that's OK, I'd get started.

Best regards,
Martin

On 20 August 2014 23:03, Ankur Dave ankurd...@gmail.com wrote:
 At 2014-08-20 10:57:57 -0700, Mohit Singh mohit1...@gmail.com wrote:
 I was wondering if Personalized Page Rank algorithm is implemented in 
 graphx. If the talks and presentation were to be believed 
 (https://amplab.cs.berkeley.edu/wp-content/uploads/2014/02/graphx@strata2014_final.pdf)
  it is.. but cant find the algo code 
 (https://github.com/amplab/graphx/tree/master/graphx/src/main/scala/org/apache/spark/graphx/lib)?

 Those slides only mean Personalized PageRank can be expressed within the 
 neighborhood-centric model. I don't think anyone has implemented it for 
 GraphX yet. If you're interested in doing that, we'd be glad to add it to the 
 library!

 Ankur

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Job using Spark for Machine Learning

2014-07-29 Thread Martin Goodson
I'm not sure if job adverts are allowed on here - please let me know if
not.

Otherwise, if you're interested in using Spark in an RD machine learning
project then please get in touch. We are a startup based in London.

Our data sets are on a massive scale- we collect data on over a billion
users per month and are second only to Google in the contextual advertising
space (ok - a distant second!).

Details here:
*http://grnh.se/rl8f25 http://grnh.se/rl8f25*

-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


Re: Configuring Spark Memory

2014-07-24 Thread Martin Goodson
Thank you Nishkam,
I have read your code. So, for the sake of my understanding, it seems that
for each spark context there is one executor per node? Can anyone confirm
this?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in Spark.
 Takes as input high-level parameters (like number of nodes, cores per node,
 memory per node, etc) and spits out default configuration, user advice and
 command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this over
 time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might set
 spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set of
 executors across the cluster.  So for example if you have two shells open,
 they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to the
 *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 other memory settings available (daemon memory, worker memory etc). Perhaps
 a worked example could be added to the docs? I would be happy to provide
 some text as soon as someone can enlighten me on the technicalities!

 Thank you

 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]







Re: Configuring Spark Memory

2014-07-24 Thread Martin Goodson
Great - thanks for the clarification Aaron. The offer stands for me to
write some documentation and an example that covers this without leaving
*any* room for ambiguity.




-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson ilike...@gmail.com wrote:

 Whoops, I was mistaken in my original post last year. By default, there is
 one executor per node per Spark Context, as you said.
 spark.executor.memory is the amount of memory that the application
 requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
 memory a Spark Worker is willing to allocate in executors.

 So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your
 cluster, and spark.executor.memory to 4g, you would be able to run 2
 simultaneous Spark Contexts who get 4g per node. Similarly, if
 spark.executor.memory were 8g, you could only run 1 Spark Context at a time
 on the cluster, but it would get all the cluster's memory.


 On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thank you Nishkam,
 I have read your code. So, for the sake of my understanding, it seems
 that for each spark context there is one executor per node? Can anyone
 confirm this?


 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi nr...@cloudera.com wrote:

 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in
 Spark. Takes as input high-level parameters (like number of nodes, cores
 per node, memory per node, etc) and spits out default configuration, user
 advice and command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this
 over time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 If each machine has 16 GB of RAM and 4 cores, for example, you might
 set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
 Spark.)

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com
 wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set
 of executors across the cluster.  So for example if you have two shells
 open, they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons 
 (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors 
 on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to
 the *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad

Configuring Spark Memory

2014-07-23 Thread Martin Goodson
We are having difficulties configuring Spark, partly because we still don't
understand some key concepts. For instance, how many executors are there
per machine in standalone mode? This is after having closely read the
documentation several times:

*http://spark.apache.org/docs/latest/configuration.html
http://spark.apache.org/docs/latest/configuration.html*
*http://spark.apache.org/docs/latest/spark-standalone.html
http://spark.apache.org/docs/latest/spark-standalone.html*
*http://spark.apache.org/docs/latest/tuning.html
http://spark.apache.org/docs/latest/tuning.html*
*http://spark.apache.org/docs/latest/cluster-overview.html
http://spark.apache.org/docs/latest/cluster-overview.html*

The cluster overview has some information here about executors but is
ambiguous about whether there are single executors or multiple executors on
each machine.

 This message from Aaron Davidson implies that the executor memory should
be set to total available memory on the machine divided by the number of
cores:
*http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

But other messages imply that the executor memory should be set to the
*total* available memory of each machine.

We would very much appreciate some clarity on this and the myriad of other
memory settings available (daemon memory, worker memory etc). Perhaps a
worked example could be added to the docs? I would be happy to provide some
text as soon as someone can enlighten me on the technicalities!

Thank you

-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


Re: Configuring Spark Memory

2014-07-23 Thread Martin Goodson
Thanks Andrew,

So if there is only one SparkContext there is only one executor per
machine? This seems to contradict Aaron's message from the link above:

If each machine has 16 GB of RAM and 4 cores, for example, you might set
spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)

Am I reading this incorrectly?

Anyway our configuration is 21 machines (one master and 20 slaves) each
with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
want to leave say 16Gb on each machine for python processes.

Thanks again for the advice!



-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set of
 executors across the cluster.  So for example if you have two shells open,
 they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have and
 how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com
 wrote:

 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely read
 the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 http://spark.apache.org/docs/latest/configuration.html*
 *http://spark.apache.org/docs/latest/spark-standalone.html
 http://spark.apache.org/docs/latest/spark-standalone.html*
 *http://spark.apache.org/docs/latest/tuning.html
 http://spark.apache.org/docs/latest/tuning.html*
 *http://spark.apache.org/docs/latest/cluster-overview.html
 http://spark.apache.org/docs/latest/cluster-overview.html*

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E*

 But other messages imply that the executor memory should be set to the
 *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 other memory settings available (daemon memory, worker memory etc). Perhaps
 a worked example could be added to the docs? I would be happy to provide
 some text as soon as someone can enlighten me on the technicalities!

 Thank you

 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]





Re: Problem running Spark shell (1.0.0) on EMR

2014-07-22 Thread Martin Goodson
I am also having exactly the same problem, calling using pyspark. Has
anyone managed to get this script to work?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Wed, Jul 16, 2014 at 2:10 PM, Ian Wilkinson ia...@me.com wrote:

 Hi,

 I’m trying to run the Spark (1.0.0) shell on EMR and encountering a
 classpath issue.
 I suspect I’m missing something gloriously obviously, but so far it is
 eluding me.

 I launch the EMR Cluster (using the aws cli) with:

 aws emr create-cluster --name Test Cluster  \
 --ami-version 3.0.3 \
 --no-auto-terminate \
 --ec2-attributes KeyName=... \
 --bootstrap-actions
 Path=s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb \
 --instance-groups
 InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium  \
 InstanceGroupType=CORE,InstanceCount=1,InstanceType=m1.medium
 --region eu-west-1

 then,

 $ aws emr ssh --cluster-id ... --key-pair-file ... --region eu-west-1

 On the master node, I then launch the shell with:

 [hadoop@ip-... spark]$ ./bin/spark-shell

 and try performing:

 scala val logs = sc.textFile(s3n://.../“)

 this produces:

 14/07/16 12:40:35 WARN storage.BlockManager: Putting block broadcast_0
 failed
 java.lang.NoSuchMethodError:
 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;


 Any help mighty welcome,
 ian




Re: TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY

2014-07-21 Thread Martin Gammelsæter
Aha, that makes sense. Thanks for the response! I guess one of the
areas Spark could need some love in in error messages (:

On Fri, Jul 18, 2014 at 9:41 PM, Michael Armbrust
mich...@databricks.com wrote:
 Sorry for the non-obvious error message.  It is not valid SQL to include
 attributes in the select clause unless they are also in the group by clause
 or are inside of an aggregate function.

 On Jul 18, 2014 5:12 AM, Martin Gammelsæter martingammelsae...@gmail.com
 wrote:

 Hi again!

 I am having problems when using GROUP BY on both SQLContext and
 HiveContext (same problem).

 My code (simplified as much as possible) can be seen here:
 http://pastebin.com/33rjW67H

 In short, I'm getting data from a Cassandra store with Datastax' new
 driver (which works great by the way, recommended!), and mapping it to
 a Spark SQL table through a Product class (Dokument in the source).
 Regular SELECTs and stuff works fine, but once I try to do a GROUP BY,
 I get the following error:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0.0:25 failed 4 times, most recent
 failure: Exception failure in TID 63 on host 192.168.121.132:
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No
 function to evaluate expression. type: AttributeReference, tree: id#0

 org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158)

 org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 scala.collection.AbstractIterator.to(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 What am I doing wrong?

 --
 Best regards,
 Martin Gammelsæter



-- 
Mvh.
Martin Gammelsæter
92209139


TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY

2014-07-18 Thread Martin Gammelsæter
Hi again!

I am having problems when using GROUP BY on both SQLContext and
HiveContext (same problem).

My code (simplified as much as possible) can be seen here:
http://pastebin.com/33rjW67H

In short, I'm getting data from a Cassandra store with Datastax' new
driver (which works great by the way, recommended!), and mapping it to
a Spark SQL table through a Product class (Dokument in the source).
Regular SELECTs and stuff works fine, but once I try to do a GROUP BY,
I get the following error:

Exception in thread main org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0.0:25 failed 4 times, most recent
failure: Exception failure in TID 63 on host 192.168.121.132:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No
function to evaluate expression. type: AttributeReference, tree: id#0

org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158)

org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)

scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

What am I doing wrong?

-- 
Best regards,
Martin Gammelsæter


Re: Supported SQL syntax in Spark SQL

2014-07-14 Thread Martin Gammelsæter
I am very interested in the original question as well, is there any
list (even if it is simply in the code) of all supported syntax for
Spark SQL?

On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Are you sure the code running on the cluster has been updated?

 I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m
 assuming that’s taken care of, at least in theory.

 I just spun down the clusters I had up, but I will revisit this tomorrow and
 provide the information you requested.

 Nick



-- 
Mvh.
Martin Gammelsæter
92209139


Re: Disabling SparkContext WebUI on port 4040, accessing information programatically?

2014-07-09 Thread Martin Gammelsæter
Thanks for your input, Koert and DB. Rebuilding with 9.x didn't seem
to work. For now we've downgraded dropwizard to 0.6.2 which uses a
compatible version of jetty. Not optimal, but it works for now.

On Tue, Jul 8, 2014 at 7:04 PM, DB Tsai dbt...@dbtsai.com wrote:
 We're doing similar thing to lunch spark job in tomcat, and I opened a
 JIRA for this. There are couple technical discussions there.

 https://issues.apache.org/jira/browse/SPARK-2100

 In this end, we realized that spark uses jetty not only for Spark
 WebUI, but also for distributing the jars and tasks, so it really hard
 to remove the web dependency in Spark.

 In the end, we lunch our spark job in yarn-cluster mode, and in the
 runtime, the only dependency in our web application is spark-yarn
 which doesn't contain any spark web stuff.

 PS, upgrading the spark jetty 8.x to 9.x in spark may not be
 straightforward by just changing the version in spark build script.
 Jetty 9.x required Java 7 since the servlet api (servlet 3.1) requires
 Java 7.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Tue, Jul 8, 2014 at 8:43 AM, Koert Kuipers ko...@tresata.com wrote:
 do you control your cluster and spark deployment? if so, you can try to
 rebuild with jetty 9.x


 On Tue, Jul 8, 2014 at 9:39 AM, Martin Gammelsæter
 martingammelsae...@gmail.com wrote:

 Digging a bit more I see that there is yet another jetty instance that
 is causing the problem, namely the BroadcastManager has one. I guess
 this one isn't very wise to disable... It might very well be that the
 WebUI is a problem as well, but I guess the code doesn't get far
 enough. Any ideas on how to solve this? Spark seems to use jetty
 8.1.14, while dropwizard uses jetty 9.0.7, so that might be the source
 of the problem. Any ideas?

 On Tue, Jul 8, 2014 at 2:58 PM, Martin Gammelsæter
 martingammelsae...@gmail.com wrote:
  Hi!
 
  I am building a web frontend for a Spark app, allowing users to input
  sql/hql and get results back. When starting a SparkContext from within
  my server code (using jetty/dropwizard) I get the error
 
  java.lang.NoSuchMethodError:
  org.eclipse.jetty.server.AbstractConnector: method init()V not found
 
  when Spark tries to fire up its own jetty server. This does not happen
  when running the same code without my web server. This is probably
  fixable somehow(?) but I'd like to disable the webUI as I don't need
  it, and ideally I would like to access that information
  programatically instead, allowing me to embed it in my own web
  application.
 
  Is this possible?
 
  --
  Best regards,
  Martin Gammelsæter



 --
 Mvh.
 Martin Gammelsæter
 92209139





-- 
Mvh.
Martin Gammelsæter
92209139


Initial job has not accepted any resources means many things

2014-07-09 Thread Martin Gammelsæter
It seems like the Initial job has not accepted any resources; shows
up for a wide variety of different errors (for example the obvious one
where you've requested more memory than is available) but also for
example in the case where the worker nodes does not have the
appropriate code on their class path. Debugging from this error is
very hard as errors does not show up in the logs on the workers. Is
this a known issue? I'm having issues with getting the code to the
workers without using addJar (my code is a fairly static application,
and I'd like to avoid using addJar every time the app starts up, and
instead manually add the jar to the classpath of every worke), but I
can't seem to find out how)

-- 
Best regards,
Martin Gammelsæter


Re: Disabling SparkContext WebUI on port 4040, accessing information programatically?

2014-07-08 Thread Martin Gammelsæter
Digging a bit more I see that there is yet another jetty instance that
is causing the problem, namely the BroadcastManager has one. I guess
this one isn't very wise to disable... It might very well be that the
WebUI is a problem as well, but I guess the code doesn't get far
enough. Any ideas on how to solve this? Spark seems to use jetty
8.1.14, while dropwizard uses jetty 9.0.7, so that might be the source
of the problem. Any ideas?

On Tue, Jul 8, 2014 at 2:58 PM, Martin Gammelsæter
martingammelsae...@gmail.com wrote:
 Hi!

 I am building a web frontend for a Spark app, allowing users to input
 sql/hql and get results back. When starting a SparkContext from within
 my server code (using jetty/dropwizard) I get the error

 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.AbstractConnector: method init()V not found

 when Spark tries to fire up its own jetty server. This does not happen
 when running the same code without my web server. This is probably
 fixable somehow(?) but I'd like to disable the webUI as I don't need
 it, and ideally I would like to access that information
 programatically instead, allowing me to embed it in my own web
 application.

 Is this possible?

 --
 Best regards,
 Martin Gammelsæter



-- 
Mvh.
Martin Gammelsæter
92209139


Re: Spark SQL user defined functions

2014-07-07 Thread Martin Gammelsæter
Hi again, and thanks for your reply!

On Fri, Jul 4, 2014 at 8:45 PM, Michael Armbrust mich...@databricks.com wrote:

 Sweet. Any idea about when this will be merged into master?


 It is probably going to be a couple of weeks.  There is a fair amount of
 cleanup that needs to be done.  It works though and we used it in most of
 the demos at the spark summit.  Mostly I just need to add tests and move it
 out of HiveContext (there is no good reason for that code to depend on
 HiveContext). So you could also just try working with that branch.


 This is probably a stupid question, but can you query Spark SQL tables
 from a (local?) hive context? In which case using that could be a
 workaround until the PR is merged.


 Yeah, this is kind of subtle.  In a HiveContext, SQL Tables are just an
 additional catalog that sits on top of the metastore.  All the query
 execution occurs in the same code path, including the use of the Hive
 Function Registry, independent of where the table comes from.  So for your
 use case you can just create a hive context, which will create a local
 metastore automatically if no hive-site.xml is present.

Nice, that sounds like it'll solve my problems. Just for clarity, is
LocalHiveContext and HiveContext equal if no hive-site.xml is present,
or are there still differences?

-- 
Best regards,
Martin Gammelsæter


Re: How to use groupByKey and CqlPagingInputFormat

2014-07-05 Thread Martin Gammelsæter
Ah, I see. Thank you!

As we are in the process of building the system we have not tried with
any large amounts of data yet, but when the time comes I'll try both
implementations and do a small benchmark.

On Fri, Jul 4, 2014 at 9:20 PM, Mohammed Guller moham...@glassbeam.com wrote:
 As far as I know, there is not much difference, except that the outer 
 parenthesis is redundant. The problem with your original code was that there 
 was mismatch in the opening and closing parenthesis. Sometimes the error 
 messages are misleading :-)

 Do you see any performance difference with the Datastax spark driver?

 Mohammed

 -Original Message-
 From: Martin Gammelsæter [mailto:martingammelsae...@gmail.com]
 Sent: Friday, July 4, 2014 12:43 AM
 To: user@spark.apache.org
 Subject: Re: How to use groupByKey and CqlPagingInputFormat

 On Thu, Jul 3, 2014 at 10:29 PM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Martin,

 1) The first map contains the columns in the primary key, which could be a 
 compound primary key containing multiple columns,  and the second map 
 contains all the non-key columns.

 Ah, thank you, that makes sense.

 2) try this fixed code:
 val navnrevmap = casRdd.map{
   case (key, value) =
 (ByteBufferUtil.string(value.get(navn)),
ByteBufferUtil.toInt(value.get(revisjon)))
}.groupByKey()

 I changed from CqlPagingInputFormat to the new Datastax cassandra-spark 
 driver, which is a bit easier to work with, but thanks! I'm curious though, 
 what is the semantic difference between
 map({}) and map{}?



-- 
Mvh.
Martin Gammelsæter
92209139


Spark SQL user defined functions

2014-07-04 Thread Martin Gammelsæter
Hi!

I have a Spark cluster running on top of a Cassandra cluster, using
Datastax' new driver, and one of the fields of my RDDs is an
XML-string. In a normal Scala sparkjob, parsing that data is no
problem, but I would like to also make that information available
through Spark SQL. So, is there any way to write user defined
functions for Spark SQL? I know that a HiveContext is available, but I
understand that that is for querying data from Hive, and I don't have
Hive in my stack (please correct me if I'm wrong).

I would love to be able to do something like the following:

val casRdd = sparkCtx.cassandraTable(ks, cf)

// registerAsTable etc

val res = sql(SELECT id, xmlGetTag(xmlfield, 'sometag') FROM cf)

-- 
Best regards,
Martin Gammelsæter


Re: Spark SQL user defined functions

2014-07-04 Thread Martin Gammelsæter
Takuya, thanks for your reply :)
I am already doing that, and it is working well. My question is, can I
define arbitrary functions to be used in these queries?

On Fri, Jul 4, 2014 at 11:12 AM, Takuya UESHIN ues...@happy-camper.st wrote:
 Hi,

 You can convert standard RDD of Product class (e.g. case class) to SchemaRDD
 by SQLContext.
 Load data from Cassandra into RDD of case class, convert it to SchemaRDD and
 register it,
 then you can use it in your SQLs.

 http://spark.apache.org/docs/latest/sql-programming-guide.html#running-sql-on-rdds

 Thanks.



 2014-07-04 17:59 GMT+09:00 Martin Gammelsæter
 martingammelsae...@gmail.com:

 Hi!

 I have a Spark cluster running on top of a Cassandra cluster, using
 Datastax' new driver, and one of the fields of my RDDs is an
 XML-string. In a normal Scala sparkjob, parsing that data is no
 problem, but I would like to also make that information available
 through Spark SQL. So, is there any way to write user defined
 functions for Spark SQL? I know that a HiveContext is available, but I
 understand that that is for querying data from Hive, and I don't have
 Hive in my stack (please correct me if I'm wrong).

 I would love to be able to do something like the following:

 val casRdd = sparkCtx.cassandraTable(ks, cf)

 // registerAsTable etc

 val res = sql(SELECT id, xmlGetTag(xmlfield, 'sometag') FROM cf)

 --
 Best regards,
 Martin Gammelsæter




 --
 Takuya UESHIN
 Tokyo, Japan

 http://twitter.com/ueshin



-- 
Mvh.
Martin Gammelsæter
92209139


  1   2   >