Re: Log4j 1.2.17 spark CVE

2021-12-13 Thread Jörn Franke
Is it in any case appropriate to use log4j 1.x which is not maintained anymore 
and has other security vulnerabilities which won’t be fixed anymore ?

> Am 13.12.2021 um 06:06 schrieb Sean Owen :
> 
> 
> Check the CVE - the log4j vulnerability appears to affect log4j 2, not 1.x. 
> There was mention that it could affect 1.x when used with JNDI or SMS 
> handlers, but Spark does neither. (unless anyone can think of something I'm 
> missing, but never heard or seen that come up at all in 7 years in Spark)
> 
> The big issue would be applications that themselves configure log4j 2.x, but 
> that's not a Spark issue per se.
> 
>> On Sun, Dec 12, 2021 at 10:46 PM Pralabh Kumar  
>> wrote:
>> Hi developers,  users 
>> 
>> Spark is built using log4j 1.2.17 . Is there a plan to upgrade based on 
>> recent CVE detected ?
>> 
>> 
>> Regards
>> Pralabh kumar


Re: Hive external table not working in sparkSQL when subdirectories are present

2019-08-07 Thread Jörn Franke
Do you use the HiveContext in Spark? Do you configure the same options there? 
Can you share some code?

> Am 07.08.2019 um 08:50 schrieb Rishikesh Gawade :
> 
> Hi.
> I am using Spark 2.3.2 and Hive 3.1.0. 
> Even if i use parquet files the result would be same, because after all 
> sparkSQL isn't able to descend into the subdirectories over which the table 
> is created. Could there be any other way?
> Thanks,
> Rishikesh
> 
>> On Tue, Aug 6, 2019, 1:03 PM Mich Talebzadeh  
>> wrote:
>> which versions of Spark and Hive are you using.
>> 
>> what will happen if you use parquet tables instead?
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Tue, 6 Aug 2019 at 07:58, Rishikesh Gawade  
>>> wrote:
>>> Hi.
>>> I have built a Hive external table on top of a directory 'A' which has data 
>>> stored in ORC format. This directory has several subdirectories inside it, 
>>> each of which contains the actual ORC files.
>>> These subdirectories are actually created by spark jobs which ingest data 
>>> from other sources and write it into this directory.
>>> I tried creating a table and setting the table properties of the same as 
>>> hive.mapred.supports.subdirectories=TRUE and 
>>> mapred.input.dir.recursive=TRUE.
>>> As a result of this, when i fire the simplest query of select count(*) from 
>>> ExtTable via the Hive CLI, it successfully gives me the expected count of 
>>> records in the table.
>>> However, when i fire the same query via sparkSQL, i get count = 0.
>>> 
>>> I think the sparkSQL isn't able to descend into the subdirectories for 
>>> getting the data while hive is able to do so.
>>> Are there any configurations needed to be set on the spark side so that 
>>> this works as it does via hive cli? 
>>> I am using Spark on YARN.
>>> 
>>> Thanks,
>>> Rishikesh
>>> 
>>> Tags: subdirectories, subdirectory, recursive, recursion, hive external 
>>> table, orc, sparksql, yarn


Re: Logistic Regression Iterations causing High GC in Spark 2.3

2019-07-29 Thread Jörn Franke
I would remove the all GC tuning and add it later once you found the underlying 
root cause. Usually more GC means you need to provide more memory, because 
something has changed (your application, spark Version etc.)

We don’t have your full code to give exact advise, but you may want to rethink 
the one code / executor approach and have less executors but more cores / 
executor. That sometimes can lead to more heap usage (especially if you 
broadcast). Keep in mind that if you use more cores/executor it usually also 
requires more memory for the executor, but less executors. Similarly the 
executor instances might be too many and they may not have enough heap.
You can also increase the memory of the executor.

> Am 29.07.2019 um 08:22 schrieb Dhrubajyoti Hati :
> 
> Hi,
> 
> We were running Logistic Regression in Spark 2.2.X and then we tried to see 
> how does it do in Spark 2.3.X. Now we are facing an issue while running a 
> Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In the 
> TreeAggregate method it takes a huge time due to very High GC Activity. I 
> have tuned the GC, created different sized clusters, higher spark 
> version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 
> times of the processing time in avg for iterations. 
> 
> The strange part is in Spark 2.2 this doesn't happen at all. Same code, same 
> cluster sizing, same data in both the cases.
> 
> I was wondering if someone can explain this behaviour and help me to resolve 
> this. How can the same code has so different behaviour in two Spark version, 
> especially the higher ones?
> 
> Here are the config which I used:
> 
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> #GC Tuning
> spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal 
> -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
> -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions 
> -XX:+G1SummarizeConcMark -Xms9000m -XX:ParallelGCThreads=20 
> -XX:ConcGCThreads=5
> 
> spark.executor.instances=20
> spark.executor.cores=1
> spark.executor.memory=9010m
> 
> 
> Regards,
> Dhrub
> 


Re: Custom datasource: when acquire and release a lock?

2019-05-26 Thread Jörn Franke
What does your data source structure look like?
Can’t you release it at the end of the build scan method? 

What technology is used in the transactional data endpoint?


> Am 24.05.2019 um 15:36 schrieb Abhishek Somani :
> 
> Hi experts,
> 
> I am trying to create a custom Spark Datasource(v1) to read from a 
> transactional data endpoint, and I need to acquire a lock with the endpoint 
> before fetching data and release the lock after reading. Note that the lock 
> acquisition and release needs to happen in the Driver JVM.
> 
> I have created a custom RDD for this purpose, and tried acquiring the lock in 
> MyRDD.getPartitions(), and releasing the lock at the end of the job by 
> registering a QueryExecutionListener. 
> 
> Now as I have learnt, this is not the right approach as the RDD can get 
> reused on further actions WITHOUT calling getPartitions() again(as the 
> partitions of an RDD get cached). For example, if someone calls 
> Dataset.collect() twice, the first time MyRDD.getPartitions() will get 
> invoked, I will acquire a lock and release the lock at the end. However the 
> second time collect() is called, getPartitions will NOT be called again as 
> the RDD would be reused and the partitions would have gotten cached in the 
> RDD. 
> 
> Can someone advice me on where would be the right places to acquire and 
> release a lock with my data endpoint in this scenario.
> 
> Thanks a lot,
> Abhishek Somani

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [VOTE] [SPARK-24615] SPIP: Accelerator-aware Scheduling

2019-03-19 Thread Jörn Franke
Also on AWS and probably some more cloud providers 

> Am 19.03.2019 um 19:45 schrieb Steve Loughran :
> 
> 
> you might want to look at the work on FPGA resources; again it should just be 
> a resource available by a scheduler. Key thing is probably just to keep the 
> docs generic
> 
> https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/UsingFPGA.html
> 
> I don't know where you get those FPGAs to play with; the Azure ML stuff looks 
> like the kind of thing to think about though: 
> https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-accelerate-with-fpgas


Re: [External] Re: [Spark RPC] Help how to debug sudden performance issue

2019-03-11 Thread Jörn Franke
Well it will be difficult to say anything without  knowing func. It could be 
that 40 cores and 200 gb for an executor is not a setup that suits the func and 
the overall architecture.

It could be also GC collection issues etc.

Sometimes it also does not help to throw hardware at the issue. It depends 
really on the processing pipeline.

> Am 11.03.2019 um 08:50 schrieb Hough, Stephen C :
> 
> Thanks
>  
> There is no issue on the worker/executor side they have ample memory > 200GB, 
> I gave that information as background to the system apologies for the 
> confusion.
>  
> The problem is isolated to the lifetime of processing a DriverEndpoint  
> StatusUpdate message.  For 40 minutes the system runs fine with 30+ 
> dispatcher threads taking turns to process results then given an external yet 
> to be determined trigger there is a slight slowdown but it’s enough to upset 
> the system for 50 minutes before recovering.
>  
> J I have little expectation of finding a solution this is a last resort punt 
> I was hoping you dev’s may have more insight of possible sources of 
> interruption like why would it take StatusUpdate  almost exactly 100ms to 
> process 1 result (as seen by the consecutive TID’s below) or any logging I 
> may be able to turn on to narrow the search.
>  
> There are no errors or warnings in the logs.
>  
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, March 11, 2019 3:08 PM
> To: Hough, Stephen C 
> Cc: dev@spark.apache.org
> Subject: [External] Re: [Spark RPC] Help how to debug sudden performance issue
>  
> Well it is a little bit difficult to say, because a lot of things are mixing 
> up here. What function is calculated? Does it need a lot of memory? Could it 
> be that you run out of memory and some spillover happens and you have a lot 
> of IO to disk which is blocking? 
>  
> Related to that could be 1 executor 40 cores. How much memory does it have 
> and need?
>  
> I would not put Kafka+ZK on the server where the driver is running.
>  
> A different Spark version - that may depend on what are the answers to the 
> questions above.
> 
> Am 11.03.2019 um 07:40 schrieb Hough, Stephen C 
> :
> 
> Spark Version: 2.0.2
>  
> I am running a cluster with 400 workers where each worker has 1 executor 
> configured with 40 cores for a total capacity of 16,000 cores.
> I run about 10,000 jobs with 1.5M tasks where the job is a simple 
> spark.parallelize(list, list.size()).map(func).collectAsysnc().  The job 
> contains a list of tasks ranging from 1 to 2000.
> ‘func’ will run our code which will do computations then post events over 
> Kafka and return a Boolean, we ignore the result.  The duration of this func 
> can be from seconds up to 20 mins.
>  
> The application context is launched on a driver server with 32 cores and the 
> only other services running on the box is a Kafka broker and zookeeper 
> service.
>  
> This particular batch in development environment took around 2 hours to run 
> which met our SLA however when we ran in production it took 3 hours to 
> complete, we thought it may have been due to another cluster we were running 
> with around 300 workers however AWS informed us that the networks were 
> isolated.  I moved the job to run later after the other clusters batch had 
> finished and the time reduced back down to 2 hrs. 
> I analyzed our logs and it shows that a yet to be determined incident 
> @22:02:42 caused Spark to ‘go slow’.
>  
> By capturing the duration from the executor thread message ‘Finished task’ I 
> tracked the TID seen by the task result getter to determine duration until 
> the result is processed on the driver and a core is freed for the scheduler.
>  
> For the most part it is within a reasonable range of 10ms then suddenly at 
> the given incident time it suddenly rises to 5s, 20s, 587s, peaking at 32m 
> only 8 mins after the incident.  So it takes 32 mins from the time the result 
> was sent back to spark driver to the time it is processed which explains the 
> performance hit because during this time the freed cores on the worker go 
> idle waiting for a new task.  Note I did track the time I saw our Kafka event 
> sent by this task and we saw it roughly 2ms later on the driver so the 
> results are getting to server over the network okay.
>  
> Looking at the rpc code it became apparent to me that if we start to see a 
> build of messages the dispatcher should turn single-threaded as it processes 
> the backlog, so I did another scan of the driver logs to look for long 
> running dispatcher threads, i.e. a dispatcher that processes more than 1 
> consecutive message.  A very obvious issue became apparent.
>  
> Dispatcher: 23 started 22:02:42:647

Re: [Spark RPC] Help how to debug sudden performance issue

2019-03-11 Thread Jörn Franke
Well it is a little bit difficult to say, because a lot of things are mixing up 
here. What function is calculated? Does it need a lot of memory? Could it be 
that you run out of memory and some spillover happens and you have a lot of IO 
to disk which is blocking? 

Related to that could be 1 executor 40 cores. How much memory does it have and 
need?

I would not put Kafka+ZK on the server where the driver is running.

A different Spark version - that may depend on what are the answers to the 
questions above.

> Am 11.03.2019 um 07:40 schrieb Hough, Stephen C 
> :
> 
> Spark Version: 2.0.2
>  
> I am running a cluster with 400 workers where each worker has 1 executor 
> configured with 40 cores for a total capacity of 16,000 cores.
> I run about 10,000 jobs with 1.5M tasks where the job is a simple 
> spark.parallelize(list, list.size()).map(func).collectAsysnc().  The job 
> contains a list of tasks ranging from 1 to 2000.
> ‘func’ will run our code which will do computations then post events over 
> Kafka and return a Boolean, we ignore the result.  The duration of this func 
> can be from seconds up to 20 mins.
>  
> The application context is launched on a driver server with 32 cores and the 
> only other services running on the box is a Kafka broker and zookeeper 
> service.
>  
> This particular batch in development environment took around 2 hours to run 
> which met our SLA however when we ran in production it took 3 hours to 
> complete, we thought it may have been due to another cluster we were running 
> with around 300 workers however AWS informed us that the networks were 
> isolated.  I moved the job to run later after the other clusters batch had 
> finished and the time reduced back down to 2 hrs.
> I analyzed our logs and it shows that a yet to be determined incident 
> @22:02:42 caused Spark to ‘go slow’.
>  
> By capturing the duration from the executor thread message ‘Finished task’ I 
> tracked the TID seen by the task result getter to determine duration until 
> the result is processed on the driver and a core is freed for the scheduler.
>  
> For the most part it is within a reasonable range of 10ms then suddenly at 
> the given incident time it suddenly rises to 5s, 20s, 587s, peaking at 32m 
> only 8 mins after the incident.  So it takes 32 mins from the time the result 
> was sent back to spark driver to the time it is processed which explains the 
> performance hit because during this time the freed cores on the worker go 
> idle waiting for a new task.  Note I did track the time I saw our Kafka event 
> sent by this task and we saw it roughly 2ms later on the driver so the 
> results are getting to server over the network okay.
>  
> Looking at the rpc code it became apparent to me that if we start to see a 
> build of messages the dispatcher should turn single-threaded as it processes 
> the backlog, so I did another scan of the driver logs to look for long 
> running dispatcher threads, i.e. a dispatcher that processes more than 1 
> consecutive message.  A very obvious issue became apparent.
>  
> Dispatcher: 23 started 22:02:42:647 processed 80386 consecutive messages for 
> a duration of 53 minutes.
>  
> If one looks at the beginning of these messages it is obvious that a slowdown 
> occurs, the first 3 are within millis of each other, then a suspicious 100ms 
> delay starts happening.
>  
> 04-03-19 22:02:43:032 [INFO] [dispatcher-event-loop-23] 
> o.a.s.s.c.CoarseGrainedSchedulerBackend$DriverEndpoint - Launching task 
> 1418419 on executor id: 1048 hostname: 10.9.141.180
> 04-03-19 22:02:43:034 [INFO] [dispatcher-event-loop-23] 
> o.a.s.s.c.CoarseGrainedSchedulerBackend$DriverEndpoint - Launching task 
> 1418420 on executor id: 967 hostname: 10.9.134.69
> 04-03-19 22:02:43:037 [INFO] [dispatcher-event-loop-23] 
> o.a.s.s.c.CoarseGrainedSchedulerBackend$DriverEndpoint - Launching task 
> 1418421 on executor id: 791 hostname: 10.9.139.73
> 04-03-19 22:02:43:136 [INFO] [dispatcher-event-loop-23] 
> o.a.s.s.c.CoarseGrainedSchedulerBackend$DriverEndpoint - Launching task 
> 1418422 on executor id: 941 hostname: 10.9.142.127
> 04-03-19 22:02:43:234 [INFO] [dispatcher-event-loop-23] 
> o.a.s.s.c.CoarseGrainedSchedulerBackend$DriverEndpoint - Launching task 
> 1418423 on executor id: 1085 hostname: 10.9.142.23
> 04-03-19 22:02:43:348 [INFO] [dispatcher-event-loop-23] 
> o.a.s.s.c.CoarseGrainedSchedulerBackend$DriverEndpoint - Launching task 
> 1418424 on executor id: 944 hostname: 10.9.141.65
>  
> Unfortunately I can’t turn on any more extra logging for the DriverEndpoint 
> ‘StatusUpdate’ handler however at a guess I would say the launchTasks, 
> executorData.executorEndpoint.send operation is introducing some sort of 
> blocking which causes a backlog that takes time to process.
>  
> When the system is running okay we don’t see this behaviour.
>  
> Q, Have you guys seen this behaviour before, and if so would an update to 
> Spark 2.4 do the trick.
>  
> If not are there 

Re: [DISCUSS] Support decimals with negative scale in decimal operation

2019-01-09 Thread Jörn Franke
Maybe it is better to introduce a new datatype that supports negative scale, 
otherwise the migration and testing efforts for organizations running Spark 
application becomes too large. Of course the current decimal will be kept as it 
is.

> Am 07.01.2019 um 15:08 schrieb Marco Gaido :
> 
> In general we can say that some datasources allow them, others fail. At the 
> moment, we are doing no casting before writing (so we can state so in the 
> doc). But since there is ongoing discussion for DSv2, we can maybe add a 
> flag/interface there for "negative scale intollerant" DS and try and cast 
> before writing to them. What do you think about this?
> 
>> Il giorno lun 7 gen 2019 alle ore 15:03 Wenchen Fan  ha 
>> scritto:
>> AFAIK parquet spec says decimal scale can't be negative. If we want to 
>> officially support negative-scale decimal, we should clearly define the 
>> behavior when writing negative-scale decimals to parquet and other data 
>> sources. The most straightforward way is to fail for this case, but maybe we 
>> can do something better, like casting decimal(1, -20) to decimal(20, 0) 
>> before writing.
>> 
>>> On Mon, Jan 7, 2019 at 9:32 PM Marco Gaido  wrote:
>>> Hi Wenchen,
>>> 
>>> thanks for your email. I agree adding doc for decimal type, but I am not 
>>> sure what you mean speaking of the behavior when writing: we are not 
>>> performing any automatic casting before writing; if we want to do that, we 
>>> need a design about it I think.
>>> 
>>> I am not sure if it makes sense to set a min for it. That would break 
>>> backward compatibility (for very weird use case), so I wouldn't do that.
>>> 
>>> Thanks,
>>> Marco
>>> 
 Il giorno lun 7 gen 2019 alle ore 05:53 Wenchen Fan  
 ha scritto:
 I think we need to do this for backward compatibility, and according to 
 the discussion in the doc, SQL standard allows negative scale.
 
 To do this, I think the PR should also include a doc for the decimal type, 
 like the definition of precision and scale(this one looks pretty good), 
 and the result type of decimal operations, and the behavior when writing 
 out decimals(e.g. we can cast decimal(1, -20) to decimal(20, 0) before 
 writing).
 
 Another question is, shall we set a min scale? e.g. shall we allow 
 decimal(1, -1000)?
 
> On Thu, Oct 25, 2018 at 9:49 PM Marco Gaido  
> wrote:
> Hi all,
> 
> a bit more than one month ago, I sent a proposal for handling properly 
> decimals with negative scales in our operations. This is a long standing 
> problem in our codebase as we derived our rules from Hive and SQLServer 
> where negative scales are forbidden, while in Spark they are not.
> 
> The discussion has been stale for a while now. No more comments on the 
> design doc: 
> https://docs.google.com/document/d/17ScbMXJ83bO9lx8hB_jeJCSryhT9O_HDEcixDq0qmPk/edit#heading=h.x7062zmkubwm.
> 
> So I am writing this e-mail in order to check whether there are more 
> comments on it or we can go ahead with the PR.
> 
> Thanks,
> Marco


Re: Self join

2018-12-11 Thread Jörn Franke
I don’t know your exact underlying business problem,  but maybe a graph 
solution, such as Spark Graphx meets better your requirements. Usually 
self-joins are done to address some kind of graph problem (even if you would 
not describe it as such) and is for these kind of problems much more efficient. 

> Am 11.12.2018 um 12:44 schrieb Marco Gaido :
> 
> Hi all,
> 
> I'd like to bring to the attention of a more people a problem which has been 
> there for long, ie, self joins. Currently, we have many troubles with them. 
> This has been reported several times to the community and seems to affect 
> many people, but as of now no solution has been accepted for it.
> 
> I created a PR some time ago in order to address the problem 
> (https://github.com/apache/spark/pull/21449), but Wenchen mentioned he tried 
> to fix this problem too but so far no attempt was successful because there is 
> no clear semantic 
> (https://github.com/apache/spark/pull/21449#issuecomment-393554552).
> 
> So I'd like to propose to discuss here which is the best approach for 
> tackling this issue, which I think would be great to fix for 3.0.0, so if we 
> decide to introduce breaking changes in the design, we can do that.
> 
> Thoughts on this?
> 
> Thanks,
> Marco


Re: Pushdown in DataSourceV2 question

2018-12-09 Thread Jörn Franke
It is not about lying or not or trust or not. Some or all filters may not be 
supported by a data source. Some might only be applied under certain 
environmental conditions (eg enough memory etc). 

It is much more expensive to communicate between Spark and a data source which 
filters have been applied or not than just checking it as Spark does. 
Especially if you have several different data sources at the same time (joins 
etc).

> Am 09.12.2018 um 14:30 schrieb Wenchen Fan :
> 
> expressions/functions can be expensive and I do think Spark should trust data 
> source and not re-apply pushed filters. If data source lies, many things can 
> go wrong...
> 
>> On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke  wrote:
>> Well even if it has to apply it again, if pushdown is activated then it will 
>> be much less cost for spark to see if the filter has been applied or not. 
>> Applying the filter is negligible, what it really avoids if the file format 
>> implements it is IO cost (for reading) as well as cost for converting from 
>> the file format internal datatype to the one of Spark. Those two things are 
>> very expensive, but not the filter check. In the end, it could be also data 
>> source internal reasons not to apply a filter (there can be many depending 
>> on your scenario, the format etc). Instead of “discussing” between Spark and 
>> the data source it is much less costly that Spark checks that the filters 
>> are consistently applied.
>> 
>>> Am 09.12.2018 um 12:39 schrieb Alessandro Solimando 
>>> :
>>> 
>>> Hello,
>>> that's an interesting question, but after Frank's reply I am a bit puzzled.
>>> 
>>> If there is no control over the pushdown status how can Spark guarantee the 
>>> correctness of the final query?
>>> 
>>> Consider a filter pushed down to the data source, either Spark has to know 
>>> if it has been applied or not, or it has to re-apply the filter anyway (and 
>>> pay the price for that).
>>> 
>>> Is there any other option I am not considering?
>>> 
>>> Best regards,
>>> Alessandro
>>> 
>>> Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke  ha 
>>> scritto:
>>>> BTW. Even for json a pushdown can make sense to avoid that data is 
>>>> unnecessary ending in Spark ( because it would cause unnecessary 
>>>> overhead). 
>>>> In the datasource v2 api you need to implement a SupportsPushDownFilter
>>>> 
>>>> > Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama :
>>>> > 
>>>> > Hi,
>>>> > 
>>>> > I'm a support engineer, interested in DataSourceV2.
>>>> > 
>>>> > Recently I had some pain to troubleshoot to check if pushdown is 
>>>> > actually applied or not.
>>>> > I noticed that DataFrame's explain() method shows pushdown even for JSON.
>>>> > It totally depends on DataSource side, I believe. However, I would like 
>>>> > Spark to have some way to confirm whether specific pushdown is actually 
>>>> > applied in DataSource or not.
>>>> > 
>>>> > # Example
>>>> > val df = spark.read.json("s3://sample_bucket/people.json")
>>>> > df.printSchema()
>>>> > df.filter($"age" > 20).explain()
>>>> > 
>>>> > root
>>>> >  |-- age: long (nullable = true)
>>>> >  |-- name: string (nullable = true)
>>>> > 
>>>> > == Physical Plan ==
>>>> > *Project [age#47L, name#48]
>>>> > +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>>>> >+- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, 
>>>> > Location: InMemoryFileIndex[s3://sample_bucket/people.json], 
>>>> > PartitionFilters: [], PushedFilters: [IsNotNull(age), 
>>>> > GreaterThan(age,20)], ReadSchema: struct
>>>> > 
>>>> > # Comments
>>>> > As you can see, PushedFilter is shown even if input data is JSON.
>>>> > Actually this pushdown is not used.
>>>> >
>>>> > I'm wondering if it has been already discussed or not.
>>>> > If not, this is a chance to have such feature in DataSourceV2 because it 
>>>> > would require some API level changes.
>>>> > 
>>>> > 
>>>> > Warm regards,
>>>> > 
>>>> > Noritaka Sekiyama
>>>> > 
>>>> 
>>>> -
>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>> 


Re: Pushdown in DataSourceV2 question

2018-12-09 Thread Jörn Franke
Well even if it has to apply it again, if pushdown is activated then it will be 
much less cost for spark to see if the filter has been applied or not. Applying 
the filter is negligible, what it really avoids if the file format implements 
it is IO cost (for reading) as well as cost for converting from the file format 
internal datatype to the one of Spark. Those two things are very expensive, but 
not the filter check. In the end, it could be also data source internal reasons 
not to apply a filter (there can be many depending on your scenario, the format 
etc). Instead of “discussing” between Spark and the data source it is much less 
costly that Spark checks that the filters are consistently applied.

> Am 09.12.2018 um 12:39 schrieb Alessandro Solimando 
> :
> 
> Hello,
> that's an interesting question, but after Frank's reply I am a bit puzzled.
> 
> If there is no control over the pushdown status how can Spark guarantee the 
> correctness of the final query?
> 
> Consider a filter pushed down to the data source, either Spark has to know if 
> it has been applied or not, or it has to re-apply the filter anyway (and pay 
> the price for that).
> 
> Is there any other option I am not considering?
> 
> Best regards,
> Alessandro
> 
> Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke  ha scritto:
>> BTW. Even for json a pushdown can make sense to avoid that data is 
>> unnecessary ending in Spark ( because it would cause unnecessary overhead). 
>> In the datasource v2 api you need to implement a SupportsPushDownFilter
>> 
>> > Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama :
>> > 
>> > Hi,
>> > 
>> > I'm a support engineer, interested in DataSourceV2.
>> > 
>> > Recently I had some pain to troubleshoot to check if pushdown is actually 
>> > applied or not.
>> > I noticed that DataFrame's explain() method shows pushdown even for JSON.
>> > It totally depends on DataSource side, I believe. However, I would like 
>> > Spark to have some way to confirm whether specific pushdown is actually 
>> > applied in DataSource or not.
>> > 
>> > # Example
>> > val df = spark.read.json("s3://sample_bucket/people.json")
>> > df.printSchema()
>> > df.filter($"age" > 20).explain()
>> > 
>> > root
>> >  |-- age: long (nullable = true)
>> >  |-- name: string (nullable = true)
>> > 
>> > == Physical Plan ==
>> > *Project [age#47L, name#48]
>> > +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>> >+- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, 
>> > Location: InMemoryFileIndex[s3://sample_bucket/people.json], 
>> > PartitionFilters: [], PushedFilters: [IsNotNull(age), 
>> > GreaterThan(age,20)], ReadSchema: struct
>> > 
>> > # Comments
>> > As you can see, PushedFilter is shown even if input data is JSON.
>> > Actually this pushdown is not used.
>> >
>> > I'm wondering if it has been already discussed or not.
>> > If not, this is a chance to have such feature in DataSourceV2 because it 
>> > would require some API level changes.
>> > 
>> > 
>> > Warm regards,
>> > 
>> > Noritaka Sekiyama
>> > 
>> 
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> 


Re: Pushdown in DataSourceV2 question

2018-12-08 Thread Jörn Franke
BTW. Even for json a pushdown can make sense to avoid that data is unnecessary 
ending in Spark ( because it would cause unnecessary overhead). 
In the datasource v2 api you need to implement a SupportsPushDownFilter

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama :
> 
> Hi,
> 
> I'm a support engineer, interested in DataSourceV2.
> 
> Recently I had some pain to troubleshoot to check if pushdown is actually 
> applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark 
> to have some way to confirm whether specific pushdown is actually applied in 
> DataSource or not.
> 
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
> 
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
> 
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>+- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, 
> Location: InMemoryFileIndex[s3://sample_bucket/people.json], 
> PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], 
> ReadSchema: struct
> 
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it 
> would require some API level changes.
> 
> 
> Warm regards,
> 
> Noritaka Sekiyama
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Pushdown in DataSourceV2 question

2018-12-08 Thread Jörn Franke
It was already available before DataSourceV2, but I think it might have been an 
internal/semi-official API (eg json is an internal datasource since some time 
now). The filters were provided to the datasource, but you will never know if 
the datasource has indeed leveraged them or if for other reasons (eg it would 
be inefficient in specific cases) decided to ignore the filters.

> Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama :
> 
> Hi,
> 
> I'm a support engineer, interested in DataSourceV2.
> 
> Recently I had some pain to troubleshoot to check if pushdown is actually 
> applied or not.
> I noticed that DataFrame's explain() method shows pushdown even for JSON.
> It totally depends on DataSource side, I believe. However, I would like Spark 
> to have some way to confirm whether specific pushdown is actually applied in 
> DataSource or not.
> 
> # Example
> val df = spark.read.json("s3://sample_bucket/people.json")
> df.printSchema()
> df.filter($"age" > 20).explain()
> 
> root
>  |-- age: long (nullable = true)
>  |-- name: string (nullable = true)
> 
> == Physical Plan ==
> *Project [age#47L, name#48]
> +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>+- *FileScan json [age#47L,name#48] Batched: false, Format: JSON, 
> Location: InMemoryFileIndex[s3://sample_bucket/people.json], 
> PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], 
> ReadSchema: struct
> 
> # Comments
> As you can see, PushedFilter is shown even if input data is JSON.
> Actually this pushdown is not used.
>
> I'm wondering if it has been already discussed or not.
> If not, this is a chance to have such feature in DataSourceV2 because it 
> would require some API level changes.
> 
> 
> Warm regards,
> 
> Noritaka Sekiyama
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark Utf 8 encoding

2018-11-10 Thread Jörn Franke
Is the original file indeed utf-8? Especially Windows environments tend to mess 
up the files (E.g. Java on Windows does not use by default UTF-8). However, 
also the software that processed the data before could have modified it.

> Am 10.11.2018 um 02:17 schrieb lsn24 :
> 
> Hello,
> 
> Per the documentation default character encoding of spark is UTF-8. But
> when i try to read non ascii characters, spark tend to read it as question
> marks. What am I doing wrong ?. Below is my Syntax:
> 
> val ds = spark.read.textFile("a .bz2 file from hdfs");
> ds.show();
> 
> The string "KøBENHAVN"  gets displayed as "K�BENHAVN"
> 
> I did the testing on spark shell, ran it the same command as a part of spark
> Job. Both yields the same result.
> 
> I don't know what I am missing . I read the documentation, I couldn't find
> any explicit config etc.
> 
> Any pointers will be greatly appreciated!
> 
> Thanks
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Coalesce behaviour

2018-10-15 Thread Jörn Franke
This is not fully correct. If you have less files then you need to move some 
data to some other nodes, because not all the data is there for writing (even 
the case for the same node, but then it is easier from a network perspective). 
Hence a shuffling is needed.


> Am 15.10.2018 um 05:04 schrieb Koert Kuipers :
> 
> sure, i understand currently the workaround is to add a shuffle. but that's 
> just a workaround, not a satisfactory solution: we shouldn't have to 
> introduce another shuffle (an expensive operation) just to reduce the number 
> of files.
> 
> logically all you need is a map-phase with less tasks after the reduce phase 
> with many tasks to reduce the number of files, but there is currently no way 
> to express this in spark. it seems the map operation always gets tagged on to 
> the end of the previous reduce operation, which is generally a reasonable 
> optimization, but not here since it causes the tasks for the reduce to go 
> down which is unacceptable.
> 
>> On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan  wrote:
>> You have a heavy workload, you want to run it with many tasks for better 
>> performance and stability(no OMM), but you also want to run it with few 
>> tasks to avoid too many small files. The reality is, mostly you can't reach 
>> these 2 goals together, they conflict with each other. The solution I can 
>> think of is to sacrifice performance a little: run the workload with many 
>> tasks at first, and then merge the many small files. Generally this is how 
>> `coalesce(n, shuffle = true)` does.
>> 
>>> On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers  wrote:
>>> we have a collection of programs in dataframe api that all do big shuffles 
>>> for which we use 2048+ partitions. this works fine but it produces a lot of 
>>> (small) output files, which put pressure on the memory of the drivers 
>>> programs of any spark program that reads this data in again.
>>> 
>>> so one of our developers stuck in a .coalesce at the end of every program 
>>> just before writing to disk to reduce the output files thinking this would 
>>> solve the many files issue. to his surprise the coalesce caused the 
>>> existing shuffles to run with less tasks, leading to unacceptable slowdowns 
>>> and OOMs. so this is not a solution.
>>> 
>>> how can we insert a coalesce as a new map-phase (new job on application 
>>> manager with narrow dependency) instead of modifying the existing reduce 
>>> phase? i am saying map-phase because it should not introduce a new shuffle: 
>>> this is wasteful and unnecessary.
>>> 
>>> 
 On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan  wrote:
 In your first example, the root RDD has 1000 partitions, then you do a 
 shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 
 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 
 reducers to process the data which were prepared for 1 reducers. since 
 the reducers have heavy work(sorting), so you OOM. In general, your work 
 flow is: 1000 mappers -> 20 reducers.
 
 In your second example, the coalesce introduces shuffle, so your work flow 
 is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The 
 sorting is done by 1000 tasks so no OOM.
 
 BTW have you tried DataFrame API? With Spark SQL, the memory management is 
 more precise, so even we only have 20 tasks to do the heavy sorting, the 
 system should just have more disk spills instead of OOM.
 
 
> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers  wrote:
> how can i get a shuffle with 2048 partitions and 2048 tasks and then a 
> map phase with 10 partitions and 10 tasks that writes to hdfs?
> 
> every time i try to do this using coalesce the shuffle ends up having 10 
> tasks which is unacceptable due to OOM. this makes coalesce somewhat 
> useless.
> 
>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan  wrote:
>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>> 
>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then 
>> `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and 
>> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and 
>> this stage has 10 tasks (decided by the last RDD). This means, each 
>> Spark task will process 10 partitions of `rdd1`.
>> 
>> Looking at your example, I don't see where is the problem. Can you 
>> describe what is not expected?
>> 
>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky  
>>> wrote:
>>> Well, it seems that I can still extend the CoalesceRDD to make it 
>>> preserve the total number of partitions from the parent RDD, reduce 
>>> some partitons in the same way as the original coalesce does for 
>>> map-only jobs and fill the gaps (partitions which should reside on the 
>>> positions of the coalesced ones) with just a special kind of 

Re: Remove Flume support in 3.0.0?

2018-10-10 Thread Jörn Franke
I think it makes sense to remove it. 
If it is not too much effort and the architecture of the flume source is not 
considered as too strange one may extract it as a separate project and put it 
on github in a dedicated non-supported repository. This would enable 
distributors and other companies to continue to use it with minor adaptions in 
case their architecture depends on it. Furthermore, if there is a growing 
interest then one could pick it up and create a clean connector based on the 
current Spark architecture to be available as a dedicated connector or again in 
later Spark versions.

That being said there are also „indirect“ ways to use Flume with Spark (eg via 
Kafka), so i believe people would not be affected so much by a removal.

(Non-Voting just my opinion)

> Am 10.10.2018 um 22:31 schrieb Sean Owen :
> 
> Marcelo makes an argument that Flume support should be removed in
> 3.0.0 at https://issues.apache.org/jira/browse/SPARK-25598
> 
> I tend to agree. Is there an argument that it needs to be supported,
> and can this move to Bahir if so?
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Support for Second level of concurrency

2018-09-25 Thread Jörn Franke
What is the ultimate goal of this algorithm?  There could be already algorithms 
that can do this within Spark. You could also put a message on Kafka (or 
another broker) and have spark applications listen to them to trigger further 
computation. This would be also more controlled and can be done already now.

> On 25. Sep 2018, at 17:31, sandeep mehandru  
> wrote:
> 
> Hi Folks,
> 
>   There is a use-case , where we are doing large computation on two large
> vectors. It is basically a scenario, where we run a flatmap operation on the
> Left vector and run co-relation logic by comparing it with all the rows of
> the second vector. When this flatmap operation is running on an executor,
> this compares row 1 from left vector with all rows of the second vector. The
> goal is that from this flatmap operation, we want to start another remote
> map operation that compares a portion of right vector rows. This enables a
> second level of concurrent operation, thereby increasing throughput and
> utilizing other nodes. But to achieve this we need access to spark context
> from within the Flatmap operation.
> 
> I have attached a snapshot describing the limitation.
> 
> 
>  
> 
> In simple words, this boils down to having access to  a spark context from
> within an executor , so that the next level of map or concurrent operations
> can be spun on the partitions on other machines. I have some experience with
> other in-memory compute grids technologies like Coherence, Hazelcast. This
> frameworks do allow to trigger next level of concurrent operations from
> within a task being executed on one node.
> 
> 
> Regards,
> Sandeep.
> 
> 
> 
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark Streaming : Multiple sources found for csv : Error

2018-08-30 Thread Jörn Franke
Can’t you remove the dependency to the databricks CSV data source? Spark has 
them now integrated since some versions so it is not needed.

> On 31. Aug 2018, at 05:52, Srabasti Banerjee  
> wrote:
> 
> Hi,
> 
> I am trying to run below code to read file as a dataframe onto a Stream (for 
> Spark Streaming) developed via Eclipse IDE, defining schemas appropriately, 
> by running thin jar on server and am getting error below. Tried out 
> suggestions from researching on internet based on 
> "spark.read.option.schema.csv" similar errors with no success.
> 
> Am thinking this can be a bug as the changes might not have been done for 
> readStream option? Has anybody encountered similar issue for Spark Streaming?
> 
> Looking forward to hear your response(s)!
> 
> Thanks
> Srabasti Banerjee
> 
> Error
> Exception in thread "main" java.lang.RuntimeException: Multiple sources found 
> for csv (com.databricks.spark.csv.DefaultSource15, 
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat), please specify 
> the fully qualified class name.
> 
> Code:
> val csvdf = spark.readStream.option("sep", 
> ",").schema(userSchema).csv("server_path") //does not resolve error
> val csvdf = spark.readStream.option("sep", 
> ",").schema(userSchema).format("com.databricks.spark.csv").csv("server_path") 
> //does not resolve error
> val csvdf = spark.readStream.option("sep", 
> ",").schema(userSchema).csv("server_path") //does not resolve error
> val csvdf = spark.readStream.option("sep", 
> ",").schema(userSchema).format("org.apache.spark.sql.execution.datasources.csv").csv("server_path")
>  //does not resolve error
> val csvdf = spark.readStream.option("sep", 
> ",").schema(userSchema).format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").csv("server_path")
>  //does not resolve error
> val csvdf = spark.readStream.option("sep", 
> ",").schema(userSchema).format("com.databricks.spark.csv.DefaultSource15").csv("server_path")
>  //does not resolve error
> 
> 
>
> 


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Jörn Franke
Hi,

You misunderstood me. I exactly wanted to say that Spark should be aware of 
them. So I agree with you. The point is to have also the yarn GPU/fpga 
scheduling as an option aside a potential spark GPU/fpga scheduler.

For the other proposal - yes the interfaces are slow, but one has to think in 
which part they need to be improved for optimal performance ml framework, Spark 
or in both. My gut feeling is in both. 

Best regards

Best regards

> On 8. May 2018, at 07:11, Reynold Xin <r...@databricks.com> wrote:
> 
> I don't think it's sufficient to have them in YARN (or any other services) 
> without Spark aware of them. If Spark is not aware of them, then there is no 
> way to really efficiently utilize these accelerators when you run anything 
> that require non-accelerators (which is almost 100% of the cases in real 
> world workloads).
> 
> For the other two, the point is not to implement all the ML/DL algorithms in 
> Spark, but make Spark integrate well with ML/DL frameworks. Otherwise you 
> will have the problems I described (super low performance when exchanging 
> data between Spark and ML/DL frameworks, and hanging issues with MPI-based 
> programs).
> 
> 
>> On Mon, May 7, 2018 at 10:05 PM Jörn Franke <jornfra...@gmail.com> wrote:
>> Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA 
>> scheduling, so it might be worth to have the last point generic that not 
>> only the Spark scheduler, but all supported schedulers can use GPU.
>> 
>> For the other 2 points I just wonder if it makes sense to address this in 
>> the ml frameworks themselves or in Spark.
>> 
>>> On 8. May 2018, at 06:59, Xiangrui Meng <m...@databricks.com> wrote:
>>> 
>>> Thanks Reynold for summarizing the offline discussion! I added a few 
>>> comments inline. -Xiangrui
>>> 
>>>> On Mon, May 7, 2018 at 5:37 PM Reynold Xin <r...@databricks.com> wrote:
>>>> Hi all,
>>>> 
>>>> Xiangrui and I were discussing with a heavy Apache Spark user last week on 
>>>> their experiences integrating machine learning (and deep learning) 
>>>> frameworks with Spark and some of their pain points. Couple things were 
>>>> obvious and I wanted to share our learnings with the list.
>>>> 
>>>> (1) Most organizations already use Spark for data plumbing and want to be 
>>>> able to run their ML part of the stack on Spark as well (not necessarily 
>>>> re-implementing all the algorithms but by integrating various frameworks 
>>>> like tensorflow, mxnet with Spark).
>>>> 
>>>> (2) The integration is however painful, from the systems perspective:
>>>> 
>>>> Performance: data exchange between Spark and other frameworks are slow, 
>>>> because UDFs across process boundaries (with native code) are slow. This 
>>>> works much better now with Pandas UDFs (given a lot of the ML/DL 
>>>> frameworks are in Python). However, there might be some low hanging fruit 
>>>> gaps here.
>>> The Arrow support behind Pands UDFs can be reused to exchange data with 
>>> other frameworks. And one possibly performance improvement is to support 
>>> pipelining when supplying data to other frameworks. For example, while 
>>> Spark is pumping data from external sources into TensorFlow, TensorFlow 
>>> starts the computation on GPUs. This would significant improve speed and 
>>> resource utilization.
>>>> Fault tolerance and execution model: Spark assumes fine-grained task 
>>>> recovery, i.e. if something fails, only that task is rerun. This doesn’t 
>>>> match the execution model of distributed ML/DL frameworks that are 
>>>> typically MPI-based, and rerunning a single task would lead to the entire 
>>>> system hanging. A whole stage needs to be re-run.
>>> This is not only useful for integrating with 3rd-party frameworks, but also 
>>> useful for scaling MLlib algorithms. One of my earliest attempts in Spark 
>>> MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up 
>>> with some compromised solutions. With the new execution model, we can set 
>>> up a hybrid cluster and do all-reduce properly.
>>>  
>>>> Accelerator-aware scheduling: The DL frameworks leverage GPUs and 
>>>> sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t 
>>>> aware of those resources, leading to either over-utilizing the 
>>>> accelerators or under-utilizing the CPUs.
>>>> 
>>>> The good thing is that none of these seem very difficult to address (and 
>>>> we have already made progress on one of them). Xiangrui has graciously 
>>>> accepted the challenge to come up with solutions and SPIP to these.
>>>> 
>>> 
>>> I will do more home work, exploring existing JIRAs or creating new JIRAs 
>>> for the proposal. We'd like to hear your feedback and past efforts along 
>>> those directions if they were not fully captured by our JIRA.
>>>  
>>>> Xiangrui - please also chime in if I didn’t capture everything. 
>>>> 
>>>> 
>>> -- 
>>> Xiangrui Meng
>>> Software Engineer
>>> Databricks Inc. 


Re: Integrating ML/DL frameworks with Spark

2018-05-07 Thread Jörn Franke
Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA scheduling, 
so it might be worth to have the last point generic that not only the Spark 
scheduler, but all supported schedulers can use GPU.

For the other 2 points I just wonder if it makes sense to address this in the 
ml frameworks themselves or in Spark.

> On 8. May 2018, at 06:59, Xiangrui Meng  wrote:
> 
> Thanks Reynold for summarizing the offline discussion! I added a few comments 
> inline. -Xiangrui
> 
>> On Mon, May 7, 2018 at 5:37 PM Reynold Xin  wrote:
>> Hi all,
>> 
>> Xiangrui and I were discussing with a heavy Apache Spark user last week on 
>> their experiences integrating machine learning (and deep learning) 
>> frameworks with Spark and some of their pain points. Couple things were 
>> obvious and I wanted to share our learnings with the list.
>> 
>> (1) Most organizations already use Spark for data plumbing and want to be 
>> able to run their ML part of the stack on Spark as well (not necessarily 
>> re-implementing all the algorithms but by integrating various frameworks 
>> like tensorflow, mxnet with Spark).
>> 
>> (2) The integration is however painful, from the systems perspective:
>> 
>> Performance: data exchange between Spark and other frameworks are slow, 
>> because UDFs across process boundaries (with native code) are slow. This 
>> works much better now with Pandas UDFs (given a lot of the ML/DL frameworks 
>> are in Python). However, there might be some low hanging fruit gaps here.
> The Arrow support behind Pands UDFs can be reused to exchange data with other 
> frameworks. And one possibly performance improvement is to support pipelining 
> when supplying data to other frameworks. For example, while Spark is pumping 
> data from external sources into TensorFlow, TensorFlow starts the computation 
> on GPUs. This would significant improve speed and resource utilization.
>> Fault tolerance and execution model: Spark assumes fine-grained task 
>> recovery, i.e. if something fails, only that task is rerun. This doesn’t 
>> match the execution model of distributed ML/DL frameworks that are typically 
>> MPI-based, and rerunning a single task would lead to the entire system 
>> hanging. A whole stage needs to be re-run.
> This is not only useful for integrating with 3rd-party frameworks, but also 
> useful for scaling MLlib algorithms. One of my earliest attempts in Spark 
> MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up 
> with some compromised solutions. With the new execution model, we can set up 
> a hybrid cluster and do all-reduce properly.
>  
>> Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes 
>> FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of 
>> those resources, leading to either over-utilizing the accelerators or 
>> under-utilizing the CPUs.
>> 
>> The good thing is that none of these seem very difficult to address (and we 
>> have already made progress on one of them). Xiangrui has graciously accepted 
>> the challenge to come up with solutions and SPIP to these.
>> 
> 
> I will do more home work, exploring existing JIRAs or creating new JIRAs for 
> the proposal. We'd like to hear your feedback and past efforts along those 
> directions if they were not fully captured by our JIRA.
>  
>> Xiangrui - please also chime in if I didn’t capture everything. 
>> 
>> 
> -- 
> Xiangrui Meng
> Software Engineer
> Databricks Inc. 


Re: Custom datasource as a wrapper for existing ones?

2018-05-03 Thread Jörn Franke
It changed from 2.0 to 2.1 to 2.2 ...
Not much but still changed. I somehow agree that this is still manageable 

> On 3. May 2018, at 16:46, Wenchen Fan <cloud0...@gmail.com> wrote:
> 
> Hi Jakub,
> 
> Yea I think data source would be the most elegant way to solve your problem. 
> Unfortunately in Spark 2.3 the only stable data source API is data source v1, 
> which can't be used to implement high-performance data source. Data source v2 
> is still a preview version in Spark 2.3 and may change in the next release.
> 
> For now I'd suggest you take a look at `FileFormat`, which is the API for the 
> Spark builtin file-based data source like parquet. It's an internal API but 
> has not been changed for a long time. In the future, data source v2 would be 
> the best solution.
> 
> Thanks,
> Wenchen
> 
>> On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak <jakub.wozn...@cern.ch> wrote:
>> Hello,
>> 
>> Thanks a lot for your answers. 
>> 
>> We normally look for some stability so the use of internal APIs that are a 
>> subject to change with no warning are somewhat questionable. 
>> As to the approach of putting this functionality on top of Spark instead of 
>> a datasource - this works but poses a problem for Python. 
>> In Python we would like to reuse the code written in Java. An external lib 
>> in Java has to proxy to Python and Spark proxies as well. 
>> This means passing over objects (like SparkSession) back and forth from one 
>> jvm to the other. Not surprisingly this did not work for us in the past 
>> (although we did not push much hoping for the datasource).
>> All in all if we don’t find another solution we might go for an external 
>> library that most likely have to be reimplemented twice in Python… 
>> Or there might be a way to force our lib execution in the same JVM as Spark 
>> uses. To be seen… Again the most elegant way would be the datasource.
>> 
>> Cheers,
>> Jakub
>> 
>> 
>> > On 2 May 2018, at 21:07, Jörn Franke <jornfra...@gmail.com> wrote:
>> > 
>> > Some note on the internal API - it used to change with each release which 
>> > was quiet annoying because  other data sources (Avro, HadoopOffice etc) 
>> > had to follow up in this. In the end it is an internal API and thus does 
>> > not guarantee to be stable. If you want to have something stable you have 
>> > to use the official data source APIs with some disadvantages.
>> > 
>> >> On 2. May 2018, at 18:49, jwozniak <jakub.wozn...@cern.ch> wrote:
>> >> 
>> >> Hello,
>> >> 
>> >> At CERN we are developing a Big Data system called NXCALS that uses Spark 
>> >> as
>> >> Extraction API.
>> >> We have implemented a custom datasource that was wrapping 2 existing ones
>> >> (parquet and Hbase) in order to hide the implementation details (location 
>> >> of
>> >> the parquet files, hbase tables, etc) and to provide an abstraction layer 
>> >> to
>> >> our users. 
>> >> We have entered a stage where we execute some performance tests on our 
>> >> data
>> >> and we have noticed that this approach did not provide the expected
>> >> performance observed using pure Spark. In other words reading a parquet 
>> >> file
>> >> with some simple predicates behaves 15 times slower if the same code is
>> >> executed from within a custom datasource (that just uses Spark to read
>> >> parquet). 
>> >> After some investigation we've learnt that Spark did not apply the same
>> >> optimisations for both. 
>> >> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> >> from SparkSession and focuses on low level Row API. 
>> >> Could you give us some suggestions of how to correctly implement our
>> >> datasource using the V2 API? 
>> >> Is this a correct way of doing it at all? 
>> >> 
>> >> What we want to achieve is to join existing datasources with some level of
>> >> additional abstraction on top. 
>> >> At the same time we want to profit from all catalyst & parquet 
>> >> optimisations
>> >> that exist for the original ones.
>> >> We also don't want to reimplement access to parquet files or Hbase at the
>> >> low level (like Row) but just profit from the Dataset API. 
>> >> We could have achieved the same by providing an external library on top of
>> >> Spark but the datasource approach looked like a more elegant solution. 
>> >> Only
>> >> the performance is still far from the desired one. 
>> >> 
>> >> Any help or direction in that matter would be greatly appreciated as we 
>> >> have
>> >> only started to build our Spark expertise yet.  
>> >> 
>> >> Best regards,
>> >> Jakub Wozniak
>> >> Software Engineer
>> >> CERN
>> >> 
>> >> 
>> >> 
>> >> --
>> >> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>> >> 
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >> 
>> 
> 


Re: Custom datasource as a wrapper for existing ones?

2018-05-02 Thread Jörn Franke
Some note on the internal API - it used to change with each release which was 
quiet annoying because  other data sources (Avro, HadoopOffice etc) had to 
follow up in this. In the end it is an internal API and thus does not guarantee 
to be stable. If you want to have something stable you have to use the official 
data source APIs with some disadvantages.

> On 2. May 2018, at 18:49, jwozniak  wrote:
> 
> Hello,
> 
> At CERN we are developing a Big Data system called NXCALS that uses Spark as
> Extraction API.
> We have implemented a custom datasource that was wrapping 2 existing ones
> (parquet and Hbase) in order to hide the implementation details (location of
> the parquet files, hbase tables, etc) and to provide an abstraction layer to
> our users. 
> We have entered a stage where we execute some performance tests on our data
> and we have noticed that this approach did not provide the expected
> performance observed using pure Spark. In other words reading a parquet file
> with some simple predicates behaves 15 times slower if the same code is
> executed from within a custom datasource (that just uses Spark to read
> parquet). 
> After some investigation we've learnt that Spark did not apply the same
> optimisations for both. 
> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
> from SparkSession and focuses on low level Row API. 
> Could you give us some suggestions of how to correctly implement our
> datasource using the V2 API? 
> Is this a correct way of doing it at all? 
> 
> What we want to achieve is to join existing datasources with some level of
> additional abstraction on top. 
> At the same time we want to profit from all catalyst & parquet optimisations
> that exist for the original ones.
> We also don't want to reimplement access to parquet files or Hbase at the
> low level (like Row) but just profit from the Dataset API. 
> We could have achieved the same by providing an external library on top of
> Spark but the datasource approach looked like a more elegant solution. Only
> the performance is still far from the desired one. 
> 
> Any help or direction in that matter would be greatly appreciated as we have
> only started to build our Spark expertise yet.  
> 
> Best regards,
> Jakub Wozniak
> Software Engineer
> CERN
> 
> 
> 
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Custom datasource as a wrapper for existing ones?

2018-05-02 Thread Jörn Franke
Spark at some point in time used for the formats shipped with Spark (eg 
parquet) an internal API that is not the data source API. You can look on how 
this is implemented for Parquet and co in the Spark source code.

Maybe this is the issue you are facing?

Have you tried to put your encapsulation in a normal application instead of the 
data source to see if it shows the same performance issues?

> On 2. May 2018, at 18:49, jwozniak  wrote:
> 
> Hello,
> 
> At CERN we are developing a Big Data system called NXCALS that uses Spark as
> Extraction API.
> We have implemented a custom datasource that was wrapping 2 existing ones
> (parquet and Hbase) in order to hide the implementation details (location of
> the parquet files, hbase tables, etc) and to provide an abstraction layer to
> our users. 
> We have entered a stage where we execute some performance tests on our data
> and we have noticed that this approach did not provide the expected
> performance observed using pure Spark. In other words reading a parquet file
> with some simple predicates behaves 15 times slower if the same code is
> executed from within a custom datasource (that just uses Spark to read
> parquet). 
> After some investigation we've learnt that Spark did not apply the same
> optimisations for both. 
> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
> from SparkSession and focuses on low level Row API. 
> Could you give us some suggestions of how to correctly implement our
> datasource using the V2 API? 
> Is this a correct way of doing it at all? 
> 
> What we want to achieve is to join existing datasources with some level of
> additional abstraction on top. 
> At the same time we want to profit from all catalyst & parquet optimisations
> that exist for the original ones.
> We also don't want to reimplement access to parquet files or Hbase at the
> low level (like Row) but just profit from the Dataset API. 
> We could have achieved the same by providing an external library on top of
> Spark but the datasource approach looked like a more elegant solution. Only
> the performance is still far from the desired one. 
> 
> Any help or direction in that matter would be greatly appreciated as we have
> only started to build our Spark expertise yet.  
> 
> Best regards,
> Jakub Wozniak
> Software Engineer
> CERN
> 
> 
> 
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Best way to Hive to Spark migration

2018-04-05 Thread Jörn Franke
And the usual hint when migrating - do not migrate only but also optimize the 
ETL process design - this brings the most benefit s

> On 5. Apr 2018, at 08:18, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Ok this is not much detail, but you are probably best off if you migrate them 
> to SparkSQL.
> 
> Depends also on the Hive version and Spark version. If you have a recent one 
> with TEZ+llap I would not expect so much difference. It can be also less 
> performant -Spark SQL got only recently some features suchst cost based 
> optimizer.
> 
>> On 5. Apr 2018, at 08:02, Pralabh Kumar <pralabhku...@gmail.com> wrote:
>> 
>> Hi 
>> 
>> I have lot of ETL jobs (complex ones) , since they are SLA critical , I am 
>> planning them to migrate to spark.
>> 
>>> On Thu, Apr 5, 2018 at 10:46 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>>> You need to provide more context on what you do currently in Hive and what 
>>> do you expect from the migration.
>>> 
>>>> On 5. Apr 2018, at 05:43, Pralabh Kumar <pralabhku...@gmail.com> wrote:
>>>> 
>>>> Hi Spark group
>>>> 
>>>> What's the best way to Migrate Hive to Spark
>>>> 
>>>> 1) Use HiveContext of Spark
>>>> 2) Use Hive on Spark 
>>>> (https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started)
>>>> 3) Migrate Hive to Calcite to Spark SQL
>>>> 
>>>> 
>>>> Regards
>>>> 
>> 


Re: Best way to Hive to Spark migration

2018-04-05 Thread Jörn Franke
Ok this is not much detail, but you are probably best off if you migrate them 
to SparkSQL.

Depends also on the Hive version and Spark version. If you have a recent one 
with TEZ+llap I would not expect so much difference. It can be also less 
performant -Spark SQL got only recently some features suchst cost based 
optimizer.

> On 5. Apr 2018, at 08:02, Pralabh Kumar <pralabhku...@gmail.com> wrote:
> 
> Hi 
> 
> I have lot of ETL jobs (complex ones) , since they are SLA critical , I am 
> planning them to migrate to spark.
> 
>> On Thu, Apr 5, 2018 at 10:46 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>> You need to provide more context on what you do currently in Hive and what 
>> do you expect from the migration.
>> 
>>> On 5. Apr 2018, at 05:43, Pralabh Kumar <pralabhku...@gmail.com> wrote:
>>> 
>>> Hi Spark group
>>> 
>>> What's the best way to Migrate Hive to Spark
>>> 
>>> 1) Use HiveContext of Spark
>>> 2) Use Hive on Spark 
>>> (https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started)
>>> 3) Migrate Hive to Calcite to Spark SQL
>>> 
>>> 
>>> Regards
>>> 
> 


Re: Best way to Hive to Spark migration

2018-04-04 Thread Jörn Franke
You need to provide more context on what you do currently in Hive and what do 
you expect from the migration.

> On 5. Apr 2018, at 05:43, Pralabh Kumar  wrote:
> 
> Hi Spark group
> 
> What's the best way to Migrate Hive to Spark
> 
> 1) Use HiveContext of Spark
> 2) Use Hive on Spark 
> (https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started)
> 3) Migrate Hive to Calcite to Spark SQL
> 
> 
> Regards
> 


Re: Spark scala development in Sbt vs Maven

2018-03-05 Thread Jörn Franke
I think most of the scala development in Spark happens with sbt - in the open 
source world.

 However, you can do it with Gradle and Maven as well. It depends on your 
organization etc. what is your standard.

Some things might be more cumbersome too reach in non-sbt scala scenarios, but 
this is more and more improving.

> On 5. Mar 2018, at 16:47, Swapnil Shinde  wrote:
> 
> Hello
>SBT's incremental compilation was a huge plus to build spark+scala 
> applications in SBT for some time. It seems Maven can also support 
> incremental compilation with Zinc server. Considering that, I am interested 
> to know communities experience -
> 
> 1. Spark documentation says SBT is being used by many contributors for day to 
> day development mainly because of incremental compilation. Considering Maven 
> is supporting incremental compilation through Zinc, do contributors prefer to 
> change from SBT to maven?
> 
> 2. Any issues /learning experiences with Maven + Zinc?
> 
> 3. Any other reasons to use SBT over Maven for scala development.
> 
> I understand SBT has many other advantages over Maven like cross version 
> publishing etc. but incremental compilation is major need for us. I am more 
> interested to know why Spark contributors/committers prefer SBT for day to 
> day development.
> 
> Any help and advice would help us to direct our evaluations in right 
> direction,
> 
> Thanks
> Swapnil

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark Data Frame. PreSorded partitions

2017-12-04 Thread Jörn Franke
Well usually you sort only on a certain column and not on all columns so most 
of the columns will always be unsorted, Spark may then still need to sort if 
you for example join (for some joins) on an unsorted column.

That being said, depending on the data you may not want to sort it, but cluster 
different column values together to be close to each other. Maybe this 
clustering information could be also part of the datasource API V2

> On 4. Dec 2017, at 16:37, Li Jin <ice.xell...@gmail.com> wrote:
> 
> Just to give another data point: most of the data we use with Spark are 
> sorted on disk, having a way to allow data source to pass ordered distributed 
> to DataFrames is really useful for us.
> 
>> On Mon, Dec 4, 2017 at 9:12 AM, Николай Ижиков <nizhikov@gmail.com> 
>> wrote:
>> Hello, guys.
>> 
>> Thank you for answers!
>> 
>> > I think pushing down a sort  could make a big difference.
>> > You can however proposes to the data source api 2 to be included.
>> 
>> Jörn, are you talking about this jira issue? - 
>> https://issues.apache.org/jira/browse/SPARK-15689
>> Is there any additional documentation I has to learn before making any 
>> proposition?
>> 
>> 
>> 
>> 04.12.2017 14:05, Holden Karau пишет:
>>> I think pushing down a sort (or really more in the case where the data is 
>>> already naturally returned in sorted order on some column) could make a big 
>>> difference. Probably the simplest argument for a lot of time being spent 
>>> sorting (in some use cases) is the fact it's still one of the standard 
>>> benchmarks.
>>> 
>>> On Mon, Dec 4, 2017 at 1:55 AM, Jörn Franke <jornfra...@gmail.com 
>>> <mailto:jornfra...@gmail.com>> wrote:
>>> 
>>> I do not think that the data source api exposes such a thing. You can 
>>> however proposes to the data source api 2 to be included.
>>> 
>>> However there are some caveats , because sorted can mean two different 
>>> things (weak vs strict order).
>>> 
>>> Then, is really a lot of time lost because of sorting? The best thing 
>>> is to not read data that is not needed at all (see min/max indexes in 
>>> orc/parquet or bloom filters in Orc). What is not read
>>> does not need to be sorted. See also predicate pushdown.
>>> 
>>>  > On 4. Dec 2017, at 07:50, Николай Ижиков <nizhikov@gmail.com 
>>> <mailto:nizhikov@gmail.com>> wrote:
>>>  >
>>>  > Cross-posting from @user.
>>>  >
>>>  > Hello, guys!
>>>  >
>>>  > I work on implementation of custom DataSource for Spark Data Frame 
>>> API and have a question:
>>>  >
>>>  > If I have a `SELECT * FROM table1 ORDER BY some_column` query I can 
>>> sort data inside a partition in my data source.
>>>  >
>>>  > Do I have a built-in option to tell spark that data from each 
>>> partition already sorted?
>>>  >
>>>  > It seems that Spark can benefit from usage of already sorted 
>>> partitions.
>>>  > By using of distributed merge sort algorithm, for example.
>>>  >
>>>  > Does it make sense for you?
>>>  >
>>>  >
>>>  > 28.11.2017 18:42, Michael Artz пишет:
>>>  >> I'm not sure other than retrieving from a hive table that is 
>>> already sorted.  This sounds cool though, would be interested to know this 
>>> as well
>>>  >> On Nov 28, 2017 10:40 AM, "Николай Ижиков" <nizhikov@gmail.com 
>>> <mailto:nizhikov@gmail.com> <mailto:nizhikov@gmail.com 
>>> <mailto:nizhikov@gmail.com>>> wrote:
>>>  >>Hello, guys!
>>>  >>I work on implementation of custom DataSource for Spark Data 
>>> Frame API and have a question:
>>>  >>If I have a `SELECT * FROM table1 ORDER BY some_column` query I 
>>> can sort data inside a partition in my data source.
>>>  >>Do I have a built-in option to tell spark that data from each 
>>> partition already sorted?
>>>  >>It seems that Spark can benefit from usage of already sorted 
>>> partitions.
>>>  >>By using of distributed merge sort algorithm, for example.
>>>  >>Does it make sense for you?
>>>  >>
>>> -
>>>  >>To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org> 
>>> <mailto:user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>>
>>>  >
>>>  > -
>>>  > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>>> <mailto:dev-unsubscr...@spark.apache.org>
>>>  >
>>> 
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>>> <mailto:dev-unsubscr...@spark.apache.org>
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Twitter: https://twitter.com/holdenkarau
>> 
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> 
> 


Re: Spark Data Frame. PreSorded partitions

2017-12-04 Thread Jörn Franke
I do not think that the data source api exposes such a thing. You can however 
proposes to the data source api 2 to be included.

However there are some caveats , because sorted can mean two different things 
(weak vs strict order).

Then, is really a lot of time lost because of sorting? The best thing is to not 
read data that is not needed at all (see min/max indexes in orc/parquet or 
bloom filters in Orc). What is not read does not need to be sorted. See also 
predicate pushdown.

> On 4. Dec 2017, at 07:50, Николай Ижиков  wrote:
> 
> Cross-posting from @user.
> 
> Hello, guys!
> 
> I work on implementation of custom DataSource for Spark Data Frame API and 
> have a question:
> 
> If I have a `SELECT * FROM table1 ORDER BY some_column` query I can sort data 
> inside a partition in my data source.
> 
> Do I have a built-in option to tell spark that data from each partition 
> already sorted?
> 
> It seems that Spark can benefit from usage of already sorted partitions.
> By using of distributed merge sort algorithm, for example.
> 
> Does it make sense for you?
> 
> 
> 28.11.2017 18:42, Michael Artz пишет:
>> I'm not sure other than retrieving from a hive table that is already sorted. 
>>  This sounds cool though, would be interested to know this as well
>> On Nov 28, 2017 10:40 AM, "Николай Ижиков" > > wrote:
>>Hello, guys!
>>I work on implementation of custom DataSource for Spark Data Frame API 
>> and have a question:
>>If I have a `SELECT * FROM table1 ORDER BY some_column` query I can sort 
>> data inside a partition in my data source.
>>Do I have a built-in option to tell spark that data from each partition 
>> already sorted?
>>It seems that Spark can benefit from usage of already sorted partitions.
>>By using of distributed merge sort algorithm, for example.
>>Does it make sense for you?
>>-
>>To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SparkSQL not support CharType

2017-11-23 Thread Jörn Franke
Or bytetype depending on the use case 

> On 23. Nov 2017, at 10:18, Herman van Hövell tot Westerflier 
>  wrote:
> 
> You need to use a StringType. The CharType and VarCharType are there to 
> ensure compatibility with Hive and ORC; they should not be used anywhere else.
> 
>> On Thu, Nov 23, 2017 at 4:09 AM, 163  wrote:
>> Hi,
>>  when I use Dataframe with table schema, It goes wrong:
>> 
>> val test_schema = StructType(Array(
>>   StructField("id", IntegerType, false),
>>   StructField("flag", CharType(1), false),
>>   StructField("time", DateType, false)));
>> 
>> val df = spark.read.format("com.databricks.spark.csv")
>>   .schema(test_schema)
>>   .option("header", "false")
>>   .option("inferSchema", "false")
>>   .option("delimiter", ",")
>>   .load("file:///Users/name/b")
>> 
>> The log is below:
>> Exception in thread "main" scala.MatchError: CharType(1) (of class 
>> org.apache.spark.sql.types.CharType)
>>  at 
>> org.apache.spark.sql.catalyst.encoders.RowEncoder$.org$apache$spark$sql$catalyst$encoders$RowEncoder$$serializerFor(RowEncoder.scala:73)
>>  at 
>> org.apache.spark.sql.catalyst.encoders.RowEncoder$$anonfun$2.apply(RowEncoder.scala:158)
>>  at 
>> org.apache.spark.sql.catalyst.encoders.RowEncoder$$anonfun$2.apply(RowEncoder.scala:157)
>> 
>> Why? Is this a bug?
>> 
>>  But I found spark will translate char type to string when using create 
>> table command:
>> 
>>   create table test(flag char(1));
>>  desc test:flag string;
>> 
>> 
>> 
>> 
>> Regards
>> Wendy He
> 
> 
> 
> -- 
> Herman van Hövell
> Software Engineer
> Databricks Inc.
> hvanhov...@databricks.com
> +31 6 420 590 27
> databricks.com
> 
> 
> 
> 


Re: is there a way for removing hadoop from spark

2017-11-12 Thread Jörn Franke
Within in a CI/CD pipeline I use MiniDFSCluster and MiniYarnCluster if the 
production cluster has also HDFS and Yarn - it has been proven as extremely 
useful and caught a lot of errors before going to the cluster (ie saves a lot 
of money).

Cf. https://wiki.apache.org/hadoop/HowToDevelopUnitTests

Works fine.

> On 13. Nov 2017, at 04:36, trs...@gmail.com wrote:
> 
> @Jörn Spark without Hadoop is useful
> For using sparks programming model on a single beefy instance
> For testing and integrating with a CI/CD pipeline.
> It's ugly to have tests which depend on a cluster running somewhere.
> 
> 
>> On Sun, 12 Nov 2017 at 17:17 Jörn Franke <jornfra...@gmail.com> wrote:
>> Why do you even mind?
>> 
>> > On 11. Nov 2017, at 18:42, Cristian Lorenzetto 
>> > <cristian.lorenze...@gmail.com> wrote:
>> >
>> > Considering the case i neednt hdfs, it there a way for removing completely 
>> > hadoop from spark?
>> > Is YARN the unique dependency in spark?
>> > is there no java or scala (jdk langs)YARN-like lib to embed in a project 
>> > instead to call external servers?
>> > YARN lib is difficult to customize?
>> >
>> > I made different questions for understanding what is the better way for me
>> 
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> 


Re: is there a way for removing hadoop from spark

2017-11-12 Thread Jörn Franke
Why do you even mind?

> On 11. Nov 2017, at 18:42, Cristian Lorenzetto 
>  wrote:
> 
> Considering the case i neednt hdfs, it there a way for removing completely 
> hadoop from spark?
> Is YARN the unique dependency in spark? 
> is there no java or scala (jdk langs)YARN-like lib to embed in a project 
> instead to call external servers?
> YARN lib is difficult to customize?
> 
> I made different questions for understanding what is the better way for me

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Task failures and other problems

2017-11-09 Thread Jörn Franke
Maybe contact Oracle support?

Do you have maybe accidentally configured some firewall rules? Routing issues? 
Maybe only one of the nodes...





> On 9. Nov 2017, at 20:04, Jan-Hendrik Zab  wrote:
> 
> 
> Hello!
> 
> This might not be the perfect list for the issue, but I tried user@
> previously with the same issue, but with a bit less information to no
> avail.
> 
> So I'm hoping someone here can point me into the right direction.
> 
> We're using Spark 2.2 on CDH 5.13 (Hadoop 2.6 with patches) and a lot of
> our jobs fail, even when the jobs are super simple. For instance: [0]
> 
> We get two kinds of "errors", one where a task is actually marked as
> failed in the web ui [1]. Basically:
> 
> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
> BP-1464899749-10.10.0.1-1378382027840:blk_1084837359_1099539407729
> file=/data/ia/derivatives/de/links/TA/part-68879.gz
> 
> See link for the stack trace.
> 
> When I check the block via "hdfs fsck -blockId blk_1084837359" all is
> well, I can also `-cat' the data into `wc'. It's a valid GZIP file.
> 
> The other kind of "error" we are getting are [2]:
> 
> DFSClient: DFS chooseDataNode: got # 1 IOException, will wait for 
> 2648.1420920453265 msec.
> BlockReaderFactory: I/O error constructing remote block reader.
> java.net.SocketException: Network is unreachable
> DFSClient: Failed to connect to /10.12.1.26:50010 for block, add to
> deadNodes and continue. java.net.SocketException: Network is unreachable
> 
> These are logged in the stderr of _some_ of the executors.
> 
> I know that both things (at least to me) look more like a problem with
> HDFS and/or CDH. But we tried reading data via mapred jobs that
> essentially just manually opened the GZIP files, read them and printed
> some status info and those didn't produce any kind of error. The only
> thing we noticed was that sometimes the read() call apparently stalled
> for several minutes. But we couldn't identify a cause so far. And we
> also didn't see any errors in the CDH logs except maybe the following
> informational messages:
> 
> Likely the client has stopped reading, disconnecting it 
> (node24.ib:50010:DataXceiver error processing READ_BLOCK operation  src: 
> /10.12.1.20:46518 dst: /10.12.1.24:50010); java.net.SocketTimeoutException: 
> 120004 millis timeout while waiting for channel to be ready for write. ch : 
> java.nio.channels.SocketChannel[connected local=/10.12.1.24:50010 
> remote=/10.12.1.20:46518]
> 
> All the systems (masters and nodes) can reach each other on the
> (infiniband) network. The systems communicate only over that one network
> (ie. datanodes only bind to one IP). /etc/hosts files are also the same
> on all systems and were distributed via ansible. But we also have a
> central DNS with the same data (and for PTR resolution) all systems are
> using.
> 
> The cluster has 37 nodes and 2 masters.
> 
> Suggestions are very welcome. :-)
> 
> [0] - http://www.l3s.de/~zab/link_converter.scala
> [1] - http://www.l3s.de/~zab/spark-errors-2.txt
> [2] - http://www.l3s.de/~zab/spark-errors.txt
> 
> Best,
>-jhz
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Joining 3 tables with 17 billions records

2017-11-02 Thread Jörn Franke
Well this sounds a lot for “only” 17 billion. However you can limit the 
resources of the job so no need that it takes all of them (might be a little 
bit longer).
Alternatively did you try to use the hbase tables directly in Hive as external 
tables and do a simple ctas? Works better if Hive is on Tez but might be also 
worth a try with mr as an engine.

> On 2. Nov 2017, at 21:08, Chetan Khatri <chetan.opensou...@gmail.com> wrote:
> 
> Jorn,
> 
> This is kind of one time load from Historical Data to Analytical Hive engine. 
> Hive version 1.2.1 and Spark version 2.0.1 with MapR distribution.
> 
> Writing every table to parquet and reading it could be very much time 
> consuming, currently entire job could take ~8 hours on 8 node of 100 Gig  ram 
> 20 core cluster, not only used utilized by me but by larger team.
> 
> Thanks
> 
> 
>> On Fri, Nov 3, 2017 at 1:31 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>> Hi,
>> 
>> Do you have a more detailed log/error message?
>> Also, can you please provide us details on the tables (no of rows, columns, 
>> size etc).
>> Is this just a one time thing or something regular?
>> If it is a one time thing then I would tend more towards putting each table 
>> in HDFS (parquet or ORC) and then join them.
>> What is the Hive and Spark version?
>> 
>> Best regards
>> 
>> > On 2. Nov 2017, at 20:57, Chetan Khatri <chetan.opensou...@gmail.com> 
>> > wrote:
>> >
>> > Hello Spark Developers,
>> >
>> > I have 3 tables that i am reading from HBase and wants to do join 
>> > transformation and save to Hive Parquet external table. Currently my join 
>> > is failing with container failed error.
>> >
>> > 1. Read table A from Hbase with ~17 billion records.
>> > 2. repartition on primary key of table A
>> > 3. create temp view of table A Dataframe
>> > 4. Read table B from HBase with ~4 billion records
>> > 5. repartition on primary key of table B
>> > 6. create temp view of table B Dataframe
>> > 7. Join both view of A and B and create Dataframe C
>> > 8.  Join Dataframe C with table D
>> > 9. coleance(20) to reduce number of file creation on already repartitioned 
>> > DF.
>> > 10. Finally store to external hive table with partition by skey.
>> >
>> > Any Suggestion or resources you come across please do share suggestions on 
>> > this to optimize this.
>> >
>> > Thanks
>> > Chetan
> 


Re: Joining 3 tables with 17 billions records

2017-11-02 Thread Jörn Franke
Hi,

Do you have a more detailed log/error message? 
Also, can you please provide us details on the tables (no of rows, columns, 
size etc).
Is this just a one time thing or something regular?
If it is a one time thing then I would tend more towards putting each table in 
HDFS (parquet or ORC) and then join them.
What is the Hive and Spark version?

Best regards

> On 2. Nov 2017, at 20:57, Chetan Khatri  wrote:
> 
> Hello Spark Developers,
> 
> I have 3 tables that i am reading from HBase and wants to do join 
> transformation and save to Hive Parquet external table. Currently my join is 
> failing with container failed error.
> 
> 1. Read table A from Hbase with ~17 billion records.
> 2. repartition on primary key of table A
> 3. create temp view of table A Dataframe
> 4. Read table B from HBase with ~4 billion records
> 5. repartition on primary key of table B
> 6. create temp view of table B Dataframe
> 7. Join both view of A and B and create Dataframe C
> 8.  Join Dataframe C with table D
> 9. coleance(20) to reduce number of file creation on already repartitioned DF.
> 10. Finally store to external hive table with partition by skey.
> 
> Any Suggestion or resources you come across please do share suggestions on 
> this to optimize this.
> 
> Thanks
> Chetan

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Anyone knows how to build and spark on jdk9?

2017-10-27 Thread Jörn Franke
Scala 2.12 is not yet supported on Spark - this means also not JDK9:
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-14220

If you look at the Oracle support then jdk 9 is anyway only supported for 6 
months. JDK 8 is Lts (5 years) JDK 18.3 will be only 6 months and JDK 18.9 is 
lts (5 years).
http://www.oracle.com/technetwork/java/eol-135779.html

I do not think Spark should support non-lts releases. Especially for JDK9 I do 
not see a strong technical need, but maybe I am overlooking something. Of 
course http2 etc would be nice for the web interfaces, but currently not very 
urgent. 

> On 27. Oct 2017, at 04:44, Zhang, Liyun  wrote:
> 
> Thanks your suggestion, seems that scala 2.12.4 support jdk9
>  
> Scala 2.12.4 is now available.
> 
> Our benchmarks show a further reduction in compile times since 2.12.3 of 
> 5-10%.
> 
> Improved Java 9 friendliness, with more to come!
> 
>  
> Best Regards
> Kelly Zhang/Zhang,Liyun
>  
>  
>  
>  
>  
> From: Reynold Xin [mailto:r...@databricks.com] 
> Sent: Friday, October 27, 2017 10:26 AM
> To: Zhang, Liyun ; dev@spark.apache.org; 
> u...@spark.apache.org
> Subject: Re: Anyone knows how to build and spark on jdk9?
>  
> It probably depends on the Scala version we use in Spark supporting Java 9 
> first. 
>  
> On Thu, Oct 26, 2017 at 7:22 PM Zhang, Liyun  wrote:
> Hi all:
> 1.   I want to build spark on jdk9 and test it with Hadoop on jdk9 env. I 
> search for jiras related to JDK9. I only found SPARK-13278.  This means now 
> spark can build or run successfully on JDK9 ?
>  
>  
> Best Regards
> Kelly Zhang/Zhang,Liyun
>  


Re: Spark-XML maintenance

2017-10-26 Thread Jörn Franke
I would address databricks with this issue - it is their repository 

> On 26. Oct 2017, at 18:43, comtef  wrote:
> 
> I've used spark for a couple of years and I found a way to contribute to the
> cause :).
> I've found a blocker in Spark XML extension
> (https://github.com/databricks/spark-xml). I'd like to know if this is the
> right place to discuss issues about this extension?
> 
> I've opened a PR to adress this problem but it's been open for a few months
> now without any review...
> 
> 
> 
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Jörn Franke
Try sparksession.conf().set

> On 28. Jul 2017, at 12:19, Chetan Khatri  wrote:
> 
> Hey Dev/ USer,
> 
> I am working with Spark 2.0.1 and with dynamic partitioning with Hive facing 
> below issue:
> 
> org.apache.hadoop.hive.ql.metadata.HiveException:
> Number of dynamic partitions created is 1344, which is more than 1000.
> To solve this try to set hive.exec.max.dynamic.partitions to at least 1344.
> 
> I tried below options, but failed:
> 
> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
> 
> spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")
> 
> Please help with alternate workaround !
> 
> Thanks


Re: is there a way to persist the lineages generated by spark?

2017-04-06 Thread Jörn Franke
I do think this is the right way, you will have to do testing with test data 
verifying that the expected output of the calculation is the output. 
Even if the logical Plan Is correct your calculation might not be. E.g. There 
can be bugs in Spark, in the UI or (what is very often) the client describes a 
calculation, but in the end the description is wrong.

> On 4. Apr 2017, at 05:19, kant kodali  wrote:
> 
> Hi All,
> 
> I am wondering if there a way to persist the lineages generated by spark 
> underneath? Some of our clients want us to prove if the result of the 
> computation that we are showing on a dashboard is correct and for that If we 
> can show the lineage of transformations that are executed to get to the 
> result then that can be the Q.E.D moment but I am not even sure if this is 
> even possible with spark?
> 
> Thanks,
> kant

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: spark sql versus interactive hive versus hive

2017-02-11 Thread Jörn Franke
I think this is a rather simplistic view. All the tools to computation 
in-memory in the end. For certain type of computation and usage patterns it 
makes sense to keep them in memory. For example, most of the machine learning 
approaches require to include the same data in several iterative calculations. 
This is what Spark has been designed for. Most aggregations/precalculations are 
just done by using the data in-memory once. Here is where Hive+Tez and to a 
limited extend Spark can help. The third pattern, where users interactively 
query the data i.e. Many concurrent users query the same or similar data very 
frequently, is addressed by Hive on Tez + Llap, Hive Tez+ Ignite or Spark + 
ignite ( and there are other tools).

So it is important to understand what your users want to do.

Then, you have a lot of benchmark data on the web to compare. However I always 
recommend to generate or use data yourself that fits to the data the company is 
using. Keep also in mind that time is needed to convert this data in a 
efficient format.

> On 10 Feb 2017, at 20:36, Saikat Kanjilal  wrote:
> 
> Folks,
> 
> I'm embarking on a project to build a POC around spark sql, I was wondering 
> if anyone has experience in comparing spark sql with hive or interactive hive 
> and data points around the types of queries suited for both, I am naively 
> assuming that spark sql will beat hive in all queries given that computations 
> are mostly done in memory but want to hear some more data  points around 
> queries that maybe problematic in spark-sql, also are there debugging tools 
> people ordinarily use with spark-sql to troubleshoot perf related issues.
> 
> 
> I look forward to hearing from the community.
> 
> Regards


Re: Maximum limit for akka.frame.size be greater than 500 MB ?

2017-01-29 Thread Jörn Franke
Which Spark version are you using? What are you trying to do exactly and what 
is the input data? As far as I know, akka has been dropped in recent Spark 
versions.

> On 30 Jan 2017, at 00:44, aravasai  wrote:
> 
> I have a spark job running on 2 terabytes of data which creates more than
> 30,000 partitions. As a result, the spark job fails with the error 
> "Map output statuses were 170415722 bytes which exceeds spark.akka.frameSize
> 52428800 bytes" (For 1 TB data)
> However, when I increase the akka.frame.size to around 500 MB, the job hangs
> with no further progress.
> 
> So, what is the ideal or maximum limit that i can assign akka.frame.size so
> that I do not get the error of map output statuses exceeding limit for large
> chunks of data ?
> 
> Is coalescing the data into smaller number of partitions the only solution
> to this problem? Is there any better way than coalescing many intermediate
> rdd's in program ?
> 
> My driver memory: 10G
> Executor memory: 36G 
> Executor memory overhead : 3G
> 
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Maximum-limit-for-akka-frame-size-be-greater-than-500-MB-tp20793.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: MLlib mission and goals

2017-01-24 Thread Jörn Franke
I also agree with Joseph and Sean.
With respect to spark-packages. I think the issue is that you have to manually 
add it, although it basically fetches the package from Maven Central (or custom 
upload).

From an organizational perspective there are other issues. E.g. You have to 
download it from the internet instead of using an artifact repository within 
the enterprise. You do not want users to download arbitrarily packages from the 
Internet into a production cluster. You also want to make sure that they do not 
use outdated or snapshot versions, that you have control over dependencies, 
licenses etc.

Currently I do not see that big artifact repository managers will support spark 
packages anytime soon. I also do not see it from the big Hadoop distributions.


> On 24 Jan 2017, at 11:37, Sean Owen  wrote:
> 
> My $0.02, which shouldn't be weighted too much.
> 
> I believe the mission as of Spark ML has been to provide the framework, and 
> then implementation of 'the basics' only. It should have the tools that cover 
> ~80% of use cases, out of the box, in a pretty well-supported and tested way.
> 
> It's not a goal to support an arbitrarily large collection of algorithms 
> because each one adds marginally less value, and IMHO, is proportionally 
> bigger baggage, because the contributors tend to skew academic, produce worse 
> code, and don't stick around to maintain it. 
> 
> The project is already generally quite overloaded; I don't know if there's 
> bandwidth to even cover the current scope. While 'the basics' is a subjective 
> label, de facto, I think we'd have to define it as essentially "what we 
> already have in place" for the foreseeable future.
> 
> That the bits on spark-packages.org aren't so hot is not a problem but a 
> symptom. Would these really be better in the core project?
> 
> And, or: I entirely agree with Joseph's take.
> 
>> On Tue, Jan 24, 2017 at 1:03 AM Joseph Bradley  wrote:
>> This thread is split off from the "Feedback on MLlib roadmap process 
>> proposal" thread for discussing the high-level mission and goals for MLlib.  
>> I hope this thread will collect feedback and ideas, not necessarily lead to 
>> huge decisions.
>> 
>> Copying from the previous thread:
>> 
>> Seth:
>> """
>> I would love to hear some discussion on the higher level goal of Spark MLlib 
>> (if this derails the original discussion, please let me know and we can 
>> discuss in another thread). The roadmap does contain specific items that 
>> help to convey some of this (ML parity with MLlib, model persistence, 
>> etc...), but I'm interested in what the "mission" of Spark MLlib is. We 
>> often see PRs for brand new algorithms which are sometimes rejected and 
>> sometimes not. Do we aim to keep implementing more and more algorithms? Or 
>> is our focus really, now that we have a reasonable library of algorithms, to 
>> simply make the existing ones faster/better/more robust? Should we aim to 
>> make interfaces that are easily extended for developers to easily implement 
>> their own custom code (e.g. custom optimization libraries), or do we want to 
>> restrict things to out-of-the box algorithms? Should we focus on more 
>> flexible, general abstractions like distributed linear algebra?
>> 
>> I was not involved in the project in the early days of MLlib when this 
>> discussion may have happened, but I think it would be useful to either 
>> revisit it or restate it here for some of the newer developers.
>> """
>> 
>> Mingjie:
>> """
>> +1 general abstractions like distributed linear algebra.
>> """
>> 
>> 
>> I'll add my thoughts, starting with our past trajectory:
>> * Initially, MLlib was mainly trying to build a set of core algorithms.
>> * Two years ago, the big effort was adding Pipelines.
>> * In the last year, big efforts have been around completing Pipelines and 
>> making the library more robust.
>> 
>> I agree with Seth that a few immediate goals are very clear:
>> * feature parity for DataFrame-based API
>> * completing and improving testing for model persistence
>> * Python, R parity
>> 
>> In the future, it's harder to say, but if I had to pick my top 2 items, I'd 
>> list:
>> 
>> (1) Making MLlib more extensible
>> It will not be feasible to support a huge number of algorithms, so allowing 
>> users to customize their ML on Spark workflows will be critical.  This is 
>> IMO the most important thing we could do for MLlib.
>> Part of this could be building a healthy community of Spark Packages, and we 
>> will need to make it easier for users to write their own algorithms and 
>> packages to facilitate this.  Part of this could be allowing users to 
>> customize existing algorithms with custom loss functions, etc.
>> 
>> (2) Consistent improvements to core algorithms
>> A less exciting but still very important item will be constantly improving 
>> the core set of algorithms in MLlib. This could mean speed, scaling, 
>> robustness, and 

Re: SPARK-18689: A proposal for priority based app scheduling utilizing linux cgroups.

2016-12-15 Thread Jörn Franke
Hi,

What about yarn or mesos used in combination with Spark. The have also cgroups. 
Or a kubernetes etc deployment.

> On 15 Dec 2016, at 17:37, Hegner, Travis  wrote:
> 
> Hello Spark Devs,
> 
> 
> I have finally completed a mostly working proof of concept. I do not want to 
> create a pull request for this code, as I don't believe it's production 
> worthy at the moment. My intent is to better communicate what I'd like to 
> accomplish. Please review the following patch: 
> https://github.com/apache/spark/compare/branch-2.0...travishegner:cgroupScheduler.
> 
> 
> What the code does:
> 
> Currently, it exposes two options "spark.cgroups.enabled", which defaults to 
> false, and "spark.executor.shares" which defaults to None. When cgroups mode 
> is enabled, a single executor is created on each worker, with access to all 
> cores. The worker will create a parent cpu cgroup (on first executor launch) 
> called "spark-worker" to house any executors that it launches. Each executor 
> is put into it's own cgroup named with the app id, under the parent cgroup. 
> The cpu.shares parameter is set to the value in "spark.executor.shares", if 
> this is "None", it inherits the value from the parent cgroup.
> 
> 
> Tested on Ubuntu 16:04 (docker containers), kernel 4.4.0-53-generic: I have 
> not run unit tests. I do not know if/how cgroups v2 (kernel 4.5) is going to 
> change this code base, but it looks like the kernel interface is the same for 
> the most part.
> 
> 
> I was able to launch a spark shell which consumed all cores in the cluster, 
> but sat idle. I was then able to launch an application (client deploy-mode) 
> which was also allocated all cores in the cluster, and ran to completion 
> unhindered. Each of the executors on each worker was properly placed into 
> it's respective cgroup, which in turn had the correct cpu.shares value 
> allocated.
> 
> 
> What the code still needs:
> 
> 
> * Documentation (assuming the community moves forward with some kind of 
> implementation)
> 
> * Sometimes the cgroups get destroyed after app completion, sometimes they 
> don't. (need to put `.destroy()` call in a `finally` block., or in the 
> `maybeCleanupApplication()` method; what do you think?)
> 
> * Proper handling of drivers's resources when running `--deploy-mode cluster`
> * Better web UI indication of cgroup mode or core sharing (currently just 
> shows up as an over allocation of cores per worker)
> 
> * Better environment/os/platform detection and testing (I won't be surprised 
> if there is something broken if trying to run this on a different OS)
> 
> * Security/permissions for cgroups if running worker as non-root (perhaps 
> creating the parent cgroup with correct permissions before launching the 
> worker is all that is necessary)
> 
>   - running the worker in a container currently requires --privileged mode (I 
> haven't figured out if/what capability makes cgroups writable, or if it's 
> possible to use a new cgroup mount point)
> 
> * More user defined options
> 
>   - cgroup root path (currently hard coded)
> 
>   - driver cpu.shares (for cluster deploy-mode: would require a specially 
> named cgroup... s"$appId-driver" ? default same #shares as executor? default 
> double shares?
> 
>   - parent cpu.shares (currently os default)
> 
>   - parent cgroup name (currently hard coded)
> 
> 
> 
> I tried to structure the initial concept to make it easy to add support for 
> more cgroup features (cpuset, mem, etc...), should the community feel there 
> is value in adding them. Linux cgroups are an extremely powerful resource 
> allocation and isolation tool, and this patch is only scratching the surface 
> of their general capabilities. Of course, as Mr. Loughran's points out, 
> expanding into these features will require more code maintenance, but not 
> enough that we should shy away from it.
> 
> 
> 
> 
> 
> I personally believe that any multi-node resource allocation system should 
> offload as much of the scheduling and resource allocation as possible to the 
> underlying kernel within the node level. Each node's own kernel is the best 
> equipped place to manage those resources. Only the node's kernel can allocate 
> a few seconds worth of cpu to the low priority app, while the high priority 
> app is waiting on disk I/O, and instantly give it back to the high priority 
> app when it needs it, with (near) real-time granularity
> 
> 
> 
> The multi-node system should set up a proper framework to give each node's 
> kernel the information it needs to allocate the resources correctly. 
> Naturally, the system should allow resource reservations, and even limits, 
> for the purposes of meeting and testing for SLAs and worst case scenarios as 
> well. Linux cgroups are capable of doing those things in a near real-time 
> fashion.
> 
> 
> 
> With a proper convention of priorities/shares for applications within an 
> organization, I believe that everyone can get better 

Re: Dynamic Graph Handling

2016-10-24 Thread Jörn Franke
Maybe titandb ?! It uses Hbase to store graphs and solr (on HDFS) to index 
graphs. I am not 100% sure it supports it, but probably.
It can also integrate Spark, but analytics on a given graph only.
Otherwise you need to go for dedicated graph system.

> On 24 Oct 2016, at 16:41, Marco  wrote:
> 
> Hi,
> 
> I'm a student in Computer Science and I'm working for my master thesis=20
> on Graph Partitioning problem, focusing on dynamic graph.
> 
> I'm searching for a framework to manage Dynamic Graph, with possible=20
> disappearing of edges/nodes. Now the problem is: GraphX alone cannot=20
> provide solution to manages this kind of graph, and searching throw=20
> internet I didn't found nothing of relevant. There is a framework or a=20
> way to handle dynamic graphs?
> 
> Thanks in advance
> 
> Marco Rocchi
> 
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Memory usage by Spark jobs

2016-09-22 Thread Jörn Franke
You should take also into account that spark has different option to represent 
data in-memory, such as Java serialized objects, Kyro serialized, Tungsten 
(columnar optionally compressed) etc. the tungsten thing depends heavily on the 
underlying data and sorting especially if compressed.
Then, you might think also about broadcasted data etc.

As such I am not aware of a specific guide, but there is also no magic behind 
it. could be a good jira task :) 

> On 22 Sep 2016, at 08:36, Hemant Bhanawat  wrote:
> 
> I am working on profiling TPCH queries for Spark 2.0.  I see lot of temporary 
> object creation (sometimes size as much as the data size) which is justified 
> for the kind of processing Spark does. But, from production perspective, is 
> there a guideline on how much memory should be allocated for processing a 
> specific data size of let's say parquet data? Also, has someone investigated 
> memory usage for the individual SQL operators like Filter, group by, order 
> by, Exchange etc.? 
> 
> Hemant Bhanawat
> www.snappydata.io 


Re: Bitmap Indexing to increase OLAP query performance

2016-06-29 Thread Jörn Franke

Is it the traditional bitmap indexing? I would not recommend it for big data. 
You could use bloom filters and min/max indexes in-memory which look to be more 
appropriate. However, if you want to use bitmap indexes then you would have to 
do it as you say. However, bitmap indexes may consume a lot of memory, so I am 
not sure that simply caching them in-memory is desired. 

> On 29 Jun 2016, at 19:49, Nishadi Kirielle  wrote:
> 
> Hi All,
> 
> I am a CSE undergraduate and as for our final year project, we are expecting 
> to construct a cluster based, bit-oriented analytic platform (storage engine) 
> to provide fast query performance when used for OLAP with the use of novel 
> bitmap indexing techniques when and where appropriate. 
> 
> For that we are expecting to use Spark SQL. We will need to implement a way 
> to cache the bit map indexes and in-cooperate the use of bitmap indexing at 
> the catalyst optimizer level when it is possible.
> 
> I would highly appreciate your feedback regarding the proposed approach.
> 
> Thank you & Regards
> 
> Nishadi Kirielle
> Department of Computer Science and Engineering
> University of Moratuwa
> Sri Lanka 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Question about Bloom Filter in Spark 2.0

2016-06-22 Thread Jörn Franke
You should see at it both levels: there is one bloom filter for Orc data and 
one for data in-memory. 

It is already a good step towards an integration of format and in-memory 
representation for columnar data. 

> On 22 Jun 2016, at 14:01, BaiRan  wrote:
> 
> After building bloom filter on existing data, does spark engine utilise bloom 
> filter during query processing?
> Is there any plan about predicate push down by using bloom filter in ORC / 
> Parquet?
> 
> Thanks
> Ran
>> On 22 Jun, 2016, at 10:48 am, Reynold Xin  wrote:
>> 
>> SPARK-12818 is about building a bloom filter on existing data. It has 
>> nothing to do with the ORC bloom filter, which can be used to do predicate 
>> pushdown.
>> 
>> 
>>> On Tue, Jun 21, 2016 at 7:45 PM, BaiRan  wrote:
>>> Hi all,
>>> 
>>> I have a question about bloom filter implementation in Spark-12818 issue. 
>>> If I have a ORC file with bloom filter metadata, how can I utilise it by 
>>> Spark SQL?
>>> Thanks.
>>> 
>>> Best,
>>> Ran
> 


Re: Structured Streaming partition logic with respect to storage and fileformat

2016-06-21 Thread Jörn Franke
Based on the underlying Hadoop FileFormat. This one does it mostly based on 
blocksize. You can change this though.

> On 21 Jun 2016, at 12:19, Sachin Aggarwal  wrote:
> 
> 
> when we use readStream to read data as Stream, how spark decides the no of 
> RDD and partition within each RDD with respect to storage and file format.
> 
> val dsJson = sqlContext.readStream.json("/Users/sachin/testSpark/inputJson")
> 
> val dsCsv = 
> sqlContext.readStream.option("header","true").csv("/Users/sachin/testSpark/inputCsv")
> val ds = sqlContext.readStream.text("/Users/sachin/testSpark/inputText")
> val dsText = ds.as[String].map(x =>(x.split(" ")(0),x.split(" 
> ")(1))).toDF("name","age")
> 
> val dsParquet = 
> sqlContext.readStream.format("parquet").parquet("/Users/sachin/testSpark/inputParquet")
> 
> 
> -- 
> 
> Thanks & Regards
> 
> Sachin Aggarwal
> 7760502772


Re: Spark performance comparison for research

2016-02-29 Thread Jörn Franke
I am not sure what you compare here. You would need to provide additional 
details, such as algorithms and functionality supported by your framework. For 
instance, Spark has built-in fault-tolerance and is a generic framework, which 
has advantage with respect to development and operations, but may have 
disadvantage in certain use cases wrt performance.
Another concern it is the SDN which could be configured in disadvantageous for 
your approach or for Spark. I would not use it for generic performance 
comparison, except if it is the production network of your company and you want 
to compare it only for your company.

 I doubt that focusing only on performance for a framework makes scientifically 
sense. 
Your approach sounds too simple to be of scientific value, but more for 
unscientific marketing purposes. That being said, it could be that you did not 
provide all the details.

> On 01 Mar 2016, at 06:25, yasincelik  wrote:
> 
> Hello,
> 
> I am working on a project as a part of my research. The system I am working
> on is basically an in-memory computing system. I want to compare its
> performance with Spark. Here is how I conduct experiments. For my project: I
> have a software defined network(SDN) that allows HPC applications to share
> data, such as sending and receiving messages through this network. For
> example, in a word count application, a master reads a 10GB text file from
> hard drive, slices into small chunks, and distribute the chunks. Each worker
> will fetch some chunks, process them, and send them back to the SDN. Then
> master collects the results.
> 
> To compare with Spark, I run word count application. I run Spark in
> standalone mode. I do not use any cluster manager. There is no pre-installed
> HDFS. I use PBS to reserve nodes, which gives me list of nodes. Then I
> simply run Spark on these nodes. Here is the command to run Spark:
> ~/SPARK/bin/spark-submit --class word.JavaWordCount  --num-executors 1
> spark.jar ~/data.txt  > ~/wc
> 
> Technically, these experiments are run under same conditions. Read file, cut
> it into small chunks, distribute chunks, process chunks, collect results.
> Do you think this is a reasonable comparison? Can someone make this claim:
> "Well, Spark is designed to work on top of HDFS, in which the data is
> already stored in nodes, and Spark jobs are submitted to these nodes to take
> advantage of data locality"
> 
> 
> Any comment is appreciated.
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-performance-comparison-for-research-tp16498.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Jörn Franke
How did you configure YARN queues? What scheduler? Preemption ?

> On 19 Feb 2016, at 06:51, Prabhu Joseph  wrote:
> 
> Hi All,
> 
>When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share a 
> single Spark Context, the jobs take more time to complete comparing with when 
> they ran with different Spark Context.
> The spark jobs are submitted on different threads.
> 
> Test Case: 
>   
> A.  3 spark jobs submitted serially
> B.  3 spark jobs submitted concurrently and with different SparkContext
> C.  3 spark jobs submitted concurrently and with same Spark Context
> D.  3 spark jobs submitted concurrently and with same Spark Context and 
> tripling the resources.
> 
> A and B takes equal time, But C and D are taking 2-3 times longer than A, 
> which shows concurrency does not improve with shared Spark Context. [Spark 
> Job Server]
> 
> Thanks,
> Prabhu Joseph

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: 回复: Spark 1.6.0 + Hive + HBase

2016-01-28 Thread Jörn Franke
Probably a newer Hive version makes a lot of sense here - at least 1.2.1. What 
storage format are you using?
I think the old Hive version had a bug where it always scanned all partitions 
unless you limit it in the on clause of the query to a certain partition (eg on 
date=20201119)

> On 28 Jan 2016, at 13:42, 开心延年  wrote:
> 
> 
> Is there any body can solve Problem 4)? thanks.
> Problem 4)
> Spark don't push down predicates for HiveTableScan, which means that every 
> query is full scan.
> 
> 
> 
> -- 原始邮件 --
> 发件人: "Julio Antonio Soto de Vicente";;
> 发送时间: 2016年1月28日(星期四) 晚上8:09
> 收件人: "Maciej Bryński";
> 抄送: "Ted Yu"; "dev";
> 主题: Re: Spark 1.6.0 + Hive + HBase
> 
> Hi,
> 
> Indeed, Hive is not able to perform predicate pushdown through a HBase table. 
> Nor Hive or Impala can.
> 
> Broadly speaking, if you need to query your  HBase table through a field 
> other than de rowkey:
> 
> A) Try to "encode" as much info as possible in the rowkey field and use it as 
> your predicate, or
> B) Feel free to use other kind of storage system/create coprocessors in order 
> to create a secondary index.
> 
> 
>> El 28 ene 2016, a las 12:56, Maciej Bryński  escribió:
>> 
>> Ted,
>> You're right.
>> hbase-site.xml resolved problems 2 and 3, but...
>> 
>> Problem 4)
>> Spark don't push down predicates for HiveTableScan, which means that every 
>> query is full scan.
>> 
>> == Physical Plan ==
>> TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#144L])
>> +- TungstenExchange SinglePartition, None
>>+- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#147L])
>>   +- Project
>>  +- Filter (added_date#141L >= 20160128)
>> +- HiveTableScan [added_date#141L], MetastoreRelation 
>> dwh_diagnostics, sessions_hbase, None
>> 
>> Is there any magic option to make this work ?
>> 
>> Regards,
>> Maciek
>> 
>> 2016-01-28 10:25 GMT+01:00 Ted Yu :
>>> For the last two problems, hbase-site.xml seems not to be on classpath.
>>> 
>>> Once hbase-site.xml is put on classpath, you should be able to make 
>>> progress.
>>> 
>>> Cheers
>>> 
 On Jan 28, 2016, at 1:14 AM, Maciej Bryński  wrote:
 
 Hi,
 I'm trying to run SQL query on Hive table which is stored on HBase.
 I'm using:
 - Spark 1.6.0
 - HDP 2.2
 - Hive 0.14.0
 - HBase 0.98.4
 
 I managed to configure working classpath, but I have following problems:
 
 1) I have UDF defined in Hive Metastore (FUNCS table).
 Spark cannot use it..
 
  File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, 
 in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql.
 : org.apache.spark.sql.AnalysisException: undefined function 
 dwh.str_to_map_int_str; line 55 pos 30
 at 
 org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
 at 
 org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
 at scala.Option.getOrElse(Option.scala:120)
 at 
 org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:68)
 at 
 org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:64)
 at scala.util.Try.getOrElse(Try.scala:77)
 at 
 org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUDFs.scala:64)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
 at 
 org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:573)
 at 
 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:570)
 
 
 2) When I'm using SQL without this function Spark tries to connect to 
 Zookeeper on localhost.
 I make a tunnel from localhost to one of the zookeeper servers but it's 
 not a solution.
 
 16/01/28 10:09:18 INFO ZooKeeper: Client 
 environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
 16/01/28 10:09:18 INFO ZooKeeper: Client environment:host.name=j4.jupyter1
 

Re: OLAP query using spark dataframe with cassandra

2015-11-08 Thread Jörn Franke

Is there any distributor supporting these software components in combination? 
If no and your core business is not software then you may want to look for 
something else, because it might not make sense to build up internal know-how 
in all of these areas.

In any case - it depends all highly on your data and queries. You will have to 
do your own experiments.

> On 09 Nov 2015, at 07:02, "fightf...@163.com"  wrote:
> 
> Hi, community
> 
> We are specially interested about this featural integration according to some 
> slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka)
> 
> seems good implementation for lambda architecure in the open-source world, 
> especially non-hadoop based cluster environment. As we can see, 
> 
> the advantages obviously consist of :
> 
> 1 the feasibility and scalability of spark datafram api, which can also make 
> a perfect complement for Apache Cassandra native cql feature.
> 
> 2 both streaming and batch process availability using the ALL-STACK thing, 
> cool.
> 
> 3 we can both achieve compacity and usability for spark with cassandra, 
> including seemlessly integrating with job scheduling and resource management.
> 
> Only one concern goes to the OLAP query performance issue, which mainly 
> caused by frequent aggregation work between daily increased large tables, for 
> 
> both spark sql and cassandra. I can see that the [1] use case facilitates 
> FiloDB to achieve columnar storage and query performance, but we had nothing 
> more 
> 
> knowledge. 
> 
> Question is : Any guy had such use case for now, especially using in your 
> production environment ? Would be interested in your architeture for 
> designing this 
> 
> OLAP engine using spark +  cassandra. What do you think the comparison 
> between the scenario with traditional OLAP cube design? Like Apache Kylin or 
> 
> pentaho mondrian ? 
> 
> Best Regards,
> 
> Sun.
> 
> 
> [1]  
> http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark
> 
> fightf...@163.com


Re: Need advice on hooking into Sql query plan

2015-11-05 Thread Jörn Franke
Would it be possible to use views to address some of your requirements?

Alternatively it might be better to parse it yourself. There are open source 
libraries for it, if you need really a complete sql parser. Do you want to do 
it on sub queries?

> On 05 Nov 2015, at 23:34, Yana Kadiyska  wrote:
> 
> Hi folks, not sure if this belongs to dev or user list..sending to dev as it 
> seems a bit convoluted.
> 
> I have a UI in which we allow users to write ad-hoc queries against a (very 
> large, partitioned) table. I would like to analyze the queries prior to 
> execution for two purposes:
> 
> 1. Reject under-constrained queries (i.e. there is a field predicate that I 
> want to make sure is always present)
> 2. Augment the query with additional predicates (e.g if the user asks for a 
> student_id I also want to push a constraint on another field)
> 
> I could parse the sql string before passing to spark but obviously spark 
> already does this anyway. Can someone give me general direction on how to do 
> this (if possible).
> 
> Something like
> 
> myDF = sql("user_sql_query")
> myDF.queryExecution.logical  //here examine the filters provided by user, 
> reject if underconstrained, push new filters as needed (via withNewChildren?)
>  
> at this point with some luck I'd have a new LogicalPlan -- what is the proper 
> way to create an execution plan on top of this new Plan? Im looking at this 
> https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala#L329
>  but this method is restricted to the package. I'd really prefer to hook into 
> this as early as possible and still let spark run the plan optimizations as 
> usual.
> 
> Any guidance or pointers much appreciated.


Re: SIGTERM 15 Issue : Spark Streaming for ingesting huge text files using custom Receiver

2015-09-12 Thread Jörn Franke
I am not sure what are you trying to achieve here. Have you thought about
using flume? Additionally maybe something like rsync?

Le sam. 12 sept. 2015 à 0:02, Varadhan, Jawahar 
a écrit :

> Hi all,
>I have a coded a custom receiver which receives kafka messages. These
> Kafka messages have FTP server credentials in them. The receiver then opens
> the message and uses the ftp credentials in it  to connect to the ftp
> server. It then streams this huge text file (3.3G) . Finally this stream it
> read line by line using buffered reader and pushed to the spark streaming
> via the receiver's "store" method. Spark streaming process receives all
> these lines and stores it in hdfs.
>
> With this process I could ingest small files (50 mb) but cant ingest this
> 3.3gb file.  I get a YARN exception of SIGTERM 15 in spark streaming
> process. Also, I tried going to that 3.3GB file directly (without custom
> receiver) in spark streaming using ssc.textFileStream  and everything works
> fine and that file ends in HDFS
>
> Please let me know what I might have to do to get this working with
> receiver. I know there are better ways to ingest the file but we need to
> use Spark streaming in our case.
>
> Thanks.
>


Re: Setting up Spark/flume/? to Ingest 10TB from FTP

2015-08-14 Thread Jörn Franke
Well what do you do in case of failure?
I think one should use a professional ingestion tool that ideally does not
need to reload everything in case of failure and verifies that the file has
been transferred correctly via checksums.
I am not sure if Flume supports ftp, but Ssh,scp should be supported. You
may check also other Flume sources or write your own in case of ftp (taking
into account comments above). I hope your file is compressed

Le ven. 14 août 2015 à 22:23, Marcelo Vanzin van...@cloudera.com a écrit :

 Why do you need to use Spark or Flume for this?

 You can just use curl and hdfs:

   curl ftp://blah | hdfs dfs -put - /blah


 On Fri, Aug 14, 2015 at 1:15 PM, Varadhan, Jawahar 
 varad...@yahoo.com.invalid wrote:

 What is the best way to bring such a huge file from a FTP server into
 Hadoop to persist in HDFS? Since a single jvm process might run out of
 memory, I was wondering if I can use Spark or Flume to do this. Any help on
 this matter is appreciated.

 I prefer a application/process running inside Hadoop which is doing this
 transfer

 Thanks.




 --
 Marcelo