Error in java_gateway.py

2018-08-08 Thread shubham
Following the code snippets on  this thread
  , I got a working version of
XGBoost on pyspark. But one issues I am still facing is the following
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/dummy_package/xgboost/xgboost.py",
line 92, in __init__self._java_obj =
self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator",
self.uid)  File
"/Users/ultrauser/Downloads/spark/python/pyspark/ml/wrapper.py", line 61, in
_new_java_objjava_obj = getattr(java_obj, name)  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1598, in __getattr__raise Py4JError("{0} does not exist in the
JVM".format(new_fqn))py4j.protocol.Py4JError:
ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator does not exist in the
JVMException ignored in: Traceback (most recent call last):  File
"/Users/ultrauser/Downloads/spark/python/pyspark/ml/wrapper.py", line 105,
in __del__   
SparkContext._active_spark_context._gateway.detach(self._java_obj)  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py",
line 2000, in detachjava_object._detach()AttributeError: 'NoneType'
object has no attribute '_detach'
>From what I read on StackOverflow and elsewhere, this looks like an issue of
jar locations. I have two jar files that are needed for this code to work
   
xgboost4j-0.72.jar   
xgboost4j-spark-0.72
But I am not sure how to proceed. This is what I have tried so far
place the xgboost jar files in 
/Library/Java/Extensions
set the environment variables 
import osos.environ['PYSPARK_SUBMIT_ARGS'] = '--jars
/Users/ultrauser/Downloads/xgboost4j-0.72.jar,
/Users/ultrauser/Downloads/xgboost4j-spark-0.72.jar pyspark-shell'
But the error still persists. Is there something I am missing here. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
an new map task after a shuffle is also a narrow dependency, isnt it? its
narrow because data doesn't need to move, e.g. every partition depends on
single partition, preferably on same machine.

modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
potentially make a mess of performance, especially when no shuffle is
needed. just a new map task.


On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:

> > shouldnt coalesce introduce a new map-phase with less tasks instead of
> changing the previous shuffle?
>
> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
> results in narrow dependency, hence no shuffle.
>
> So it is pretty clear that you need to use "repartition". Not sure there's
> any available trick to achieve it without calling repartition.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
> 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/
> Dataset.scala#L2918-L2937
>
>
> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:
>
>> sorry i meant to say:
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with
>> 100 tasks.
>>
>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:
>>
>>> the only thing that seems to stop this so far is a checkpoint.
>>>
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>>> tasks.
>>>
>>> now i need to figure out how to do this without having to checkpoint. i
>>> wish i could insert something like a dummy operation that logical steps
>>> cannot jump over.
>>>
>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:
>>>
 ok thanks.

 mh. that seems odd. shouldnt coalesce introduce a new map-phase
 with less tasks instead of changing the previous shuffle?

 using repartition seems too expensive just to keep the number of files
 down. so i guess i am back to looking for another solution.



 On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
 wrote:

> `coalesce` sets the number of partitions for the last stage, so you
> have to use `repartition` instead which is going to introduce an extra
> shuffle stage
>
> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers 
> wrote:
> >
> > one small correction: lots of files leads to pressure on the spark
> driver program when reading this data in spark.
> >
> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
> wrote:
> >>
> >> hi,
> >>
> >> i am reading data from files into a dataframe, then doing a groupBy
> for a given column with a count, and finally i coalesce to a smaller 
> number
> of partitions before writing out to disk. so roughly:
> >>
> >> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
> >>
> >> i have this setting: spark.sql.shuffle.partitions=2048
> >>
> >> i expect to see 2048 partitions in shuffle. what i am seeing
> instead is a shuffle with only 100 partitions. it's like the coalesce has
> taken over the partitioning of the groupBy.
> >>
> >> any idea why?
> >>
> >> i am doing coalesce because it is not helpful to write out 2048
> files, lots of files leads to pressure down the line on executors reading
> this data (i am writing to just one partition of a larger dataset), and
> since i have less than 100 executors i expect it to be efficient. so 
> sounds
> like a good idea, no?
> >>
> >> but i do need 2048 partitions in my shuffle due to the operation i
> am doing in the groupBy (in my real problem i am not just doing a 
> count...).
> >>
> >> thanks!
> >> koert
> >>
> >
>
>
> --
> Sent from my iPhone
>


>>>
>>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Jungtaek Lim
> shouldnt coalesce introduce a new map-phase with less tasks instead of
changing the previous shuffle?

The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
results in narrow dependency, hence no shuffle.

So it is pretty clear that you need to use "repartition". Not sure there's
any available trick to achieve it without calling repartition.

Thanks,
Jungtaek Lim (HeartSaVioR)

1.
https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937


2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:

> sorry i meant to say:
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with
> 100 tasks.
>
> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:
>
>> the only thing that seems to stop this so far is a checkpoint.
>>
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>> tasks.
>>
>> now i need to figure out how to do this without having to checkpoint. i
>> wish i could insert something like a dummy operation that logical steps
>> cannot jump over.
>>
>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:
>>
>>> ok thanks.
>>>
>>> mh. that seems odd. shouldnt coalesce introduce a new map-phase with
>>> less tasks instead of changing the previous shuffle?
>>>
>>> using repartition seems too expensive just to keep the number of files
>>> down. so i guess i am back to looking for another solution.
>>>
>>>
>>>
>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>>> wrote:
>>>
 `coalesce` sets the number of partitions for the last stage, so you
 have to use `repartition` instead which is going to introduce an extra
 shuffle stage

 On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
 >
 > one small correction: lots of files leads to pressure on the spark
 driver program when reading this data in spark.
 >
 > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
 wrote:
 >>
 >> hi,
 >>
 >> i am reading data from files into a dataframe, then doing a groupBy
 for a given column with a count, and finally i coalesce to a smaller number
 of partitions before writing out to disk. so roughly:
 >>
 >>
 spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
 >>
 >> i have this setting: spark.sql.shuffle.partitions=2048
 >>
 >> i expect to see 2048 partitions in shuffle. what i am seeing instead
 is a shuffle with only 100 partitions. it's like the coalesce has taken
 over the partitioning of the groupBy.
 >>
 >> any idea why?
 >>
 >> i am doing coalesce because it is not helpful to write out 2048
 files, lots of files leads to pressure down the line on executors reading
 this data (i am writing to just one partition of a larger dataset), and
 since i have less than 100 executors i expect it to be efficient. so sounds
 like a good idea, no?
 >>
 >> but i do need 2048 partitions in my shuffle due to the operation i
 am doing in the groupBy (in my real problem i am not just doing a 
 count...).
 >>
 >> thanks!
 >> koert
 >>
 >


 --
 Sent from my iPhone

>>>
>>>
>>
>


unsubscribe

2018-08-08 Thread 네이버


> On 8 Aug 2018, at 17:35, Daniel Zhang  wrote:
> 
> Hi, 
> 
> I have one question related to run unit test in Intellij.
> 
> I import spark into my Intellij as Maven project, and have no issue to build 
> the whole project. 
> 
> While for some unit tests, I see they can be run directly in Intellij as unit 
> tests, but others are not.
> 
> Just for example, in the test package: org.apache.spark.sql.hive.execution, 
> there are 2 tests Scala class: PruningSuite and ScriptTransformationSuite, 
> both eventually extends from SparkFunSuite.scala, which is subclass of 
> scalatest FunSuite.
> 
> But in my Intellij, as shown in the picture:
> 
> 
> 
> The PruningSuite is marked as Scala class icon, and I can click it with "Run 
> PruningSuite" directly, to run this unit test. But for 
> ScriptTranformationSuite, it is marked with different icon, and the "Run 
> " unit test option is not there.
> 
> Anyone knows the reason?
> 
> Thanks
> 
> Yong


[Structured Streaming] Understanding waterMark, flatMapGroupWithState and possibly windowing

2018-08-08 Thread subramgr
Hi,

We have a use case where we need to *sessionize* our data and for each
*session* emit some *metrics* we need to handle *repeated sessions* and
*sessions timeout*. We have come up with the following code structure and
would like to understand if we understand all the concept of *watermark*,
*flatMapGroupWithState* 

Can some one help me understand the following:

1. Will my memory consumption keep increasing ? 
2. Is my understanding correct that the *aggMetrics* data frame is a bounded
data frame and will always contain the last 30 minutes worth data?
3. When I do the aggregation in Step 2, will Spark only use the last 30
minute of data for aggregation ?

Here is the spark streaming code:
Step 1:
// 1. I am getting the data from Kafka around 50k events per second
// 2. I am using a 30 minutes watermark to filter out events that are
arriving 
//late.
// 3. I am using EventTimeTimeout
// 4. My `updateSessionState` func returns Itertor[Metric] (at minute
granularity)
val metrics: Dataset[Metric] = kafkaEvents
  .withWatermark("timestamp", "30 minutes")
  .groupByKey(e => e._2.sessionId)
  .flatMapGroupsWithState(
OutputMode.Update(),
GroupStateTimeout.EventTimeTimeout())(updateSessionState(broadcastWrapper))

Step 2:
// 1. I am aggregating the data by the metric name and the minute
// 2. I am using the watermark here again of 30 mins assuming the 
//results in the *metrics* which are 30 mins older will be removed from
the memory
//Is my assumption correct???
// 3. Is *aggMetrics* a bounded data frame which will only hold last 30
minutes of data ??
val aggMetrics: Dataset[(String, Double)] = metrics
  .map(metric => (long2timestamp(metric.timestamp), metric))
  .toDF("timestamp", "metric")
  .as[(Timestamp, Metric)]
  .withWatermark("timestamp", "30 minutes")
  .map {
case (_, m) => (s"${m.name}.${m.timestamp}",m.count)
  }
  .groupByKey(_._1)
  .agg(typed.sum(_._2))

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

Step 3:
// 1. All those metric that got updated I am emitting to KairosDB
aggMetrics
  .writeStream
  .format("com.walmart.cxtools.expo.kairos.KairosSinkProvider")
  .option("checkpointLocation", checkpointLocation)
  .outputMode(OutputMode.Update())
  .trigger(Trigger.ProcessingTime(60.seconds))
  .start()
  .awaitTermination()




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Run/install tensorframes on zeppelin pyspark

2018-08-08 Thread Jeff Zhang
Make sure you use the correct python which has tensorframe installed.
Use PYSPARK_PYTHON
to configure the python



Spico Florin 于2018年8月8日周三 下午9:59写道:

> Hi!
>
> I would like to use tensorframes in my pyspark notebook.
>
> I have performed the following:
>
> 1. In the spark intepreter adde a new repository
> http://dl.bintray.com/spark-packages/maven
> 2. in the spark interpreter added the
> dependency databricks:tensorframes:0.2.9-s_2.11
> 3. pip install tensorframes
>
>
> In both 0.7.3 and 0.8.0:
> 1.  the following code resulted in error: "ImportError: No module named
> tensorframes"
>
> %pyspark
> import tensorframes as tfs
>
> 2. the following code succeeded
> %spark
> import org.tensorframes.{dsl => tf}
> import org.tensorframes.dsl.Implicits._
> val df = spark.createDataFrame(Seq(1.0->1.1, 2.0->2.2)).toDF("a", "b")
>
> // As in Python, scoping is recommended to prevent name collisions.
> val df2 = tf.withGraph {
> val a = df.block("a")
> // Unlike python, the scala syntax is more flexible:
> val out = a + 3.0 named "out"
> // The 'mapBlocks' method is added using implicits to dataframes.
> df.mapBlocks(out).select("a", "out")
> }
>
> // The transform is all lazy at this point, let's execute it with collect:
> df2.collect()
>
> I ran the code above directly with spark interpreter with the default
> configurations (master set up to local[*] - so not via spark-submit
> command) .
>
> Also, I have installed spark home locally and ran the command
>
> $SPARK_HOME/bin/pyspark --packages databricks:tensorframes:0.2.9-s_2.11
>
> and the code below worked as expcted
>
> import tensorframes as tfs
>
>  Can you please help to solve this?
>
> Thanks,
>
>  Florin
>
>
>
>
>
>
>
>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
sorry i meant to say:
wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with
100 tasks.

On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:

> the only thing that seems to stop this so far is a checkpoint.
>
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with 4
> tasks.
>
> now i need to figure out how to do this without having to checkpoint. i
> wish i could insert something like a dummy operation that logical steps
> cannot jump over.
>
> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:
>
>> ok thanks.
>>
>> mh. that seems odd. shouldnt coalesce introduce a new map-phase with
>> less tasks instead of changing the previous shuffle?
>>
>> using repartition seems too expensive just to keep the number of files
>> down. so i guess i am back to looking for another solution.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>> wrote:
>>
>>> `coalesce` sets the number of partitions for the last stage, so you
>>> have to use `repartition` instead which is going to introduce an extra
>>> shuffle stage
>>>
>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>>> >
>>> > one small correction: lots of files leads to pressure on the spark
>>> driver program when reading this data in spark.
>>> >
>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>>> wrote:
>>> >>
>>> >> hi,
>>> >>
>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>> for a given column with a count, and finally i coalesce to a smaller number
>>> of partitions before writing out to disk. so roughly:
>>> >>
>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>> lesce(100).write.format(...).save(...)
>>> >>
>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>> >>
>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>> over the partitioning of the groupBy.
>>> >>
>>> >> any idea why?
>>> >>
>>> >> i am doing coalesce because it is not helpful to write out 2048
>>> files, lots of files leads to pressure down the line on executors reading
>>> this data (i am writing to just one partition of a larger dataset), and
>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>> like a good idea, no?
>>> >>
>>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>>> doing in the groupBy (in my real problem i am not just doing a count...).
>>> >>
>>> >> thanks!
>>> >> koert
>>> >>
>>> >
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
the only thing that seems to stop this so far is a checkpoint.

wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with 4
tasks.

now i need to figure out how to do this without having to checkpoint. i
wish i could insert something like a dummy operation that logical steps
cannot jump over.

On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:

> ok thanks.
>
> mh. that seems odd. shouldnt coalesce introduce a new map-phase with
> less tasks instead of changing the previous shuffle?
>
> using repartition seems too expensive just to keep the number of files
> down. so i guess i am back to looking for another solution.
>
>
>
> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov  wrote:
>
>> `coalesce` sets the number of partitions for the last stage, so you
>> have to use `repartition` instead which is going to introduce an extra
>> shuffle stage
>>
>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>> >
>> > one small correction: lots of files leads to pressure on the spark
>> driver program when reading this data in spark.
>> >
>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>> wrote:
>> >>
>> >> hi,
>> >>
>> >> i am reading data from files into a dataframe, then doing a groupBy
>> for a given column with a count, and finally i coalesce to a smaller number
>> of partitions before writing out to disk. so roughly:
>> >>
>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>> lesce(100).write.format(...).save(...)
>> >>
>> >> i have this setting: spark.sql.shuffle.partitions=2048
>> >>
>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>> over the partitioning of the groupBy.
>> >>
>> >> any idea why?
>> >>
>> >> i am doing coalesce because it is not helpful to write out 2048 files,
>> lots of files leads to pressure down the line on executors reading this
>> data (i am writing to just one partition of a larger dataset), and since i
>> have less than 100 executors i expect it to be efficient. so sounds like a
>> good idea, no?
>> >>
>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>> doing in the groupBy (in my real problem i am not just doing a count...).
>> >>
>> >> thanks!
>> >> koert
>> >>
>> >
>>
>>
>> --
>> Sent from my iPhone
>>
>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
ok thanks.

mh. that seems odd. shouldnt coalesce introduce a new map-phase with
less tasks instead of changing the previous shuffle?

using repartition seems too expensive just to keep the number of files
down. so i guess i am back to looking for another solution.



On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov  wrote:

> `coalesce` sets the number of partitions for the last stage, so you
> have to use `repartition` instead which is going to introduce an extra
> shuffle stage
>
> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
> >
> > one small correction: lots of files leads to pressure on the spark
> driver program when reading this data in spark.
> >
> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:
> >>
> >> hi,
> >>
> >> i am reading data from files into a dataframe, then doing a groupBy for
> a given column with a count, and finally i coalesce to a smaller number of
> partitions before writing out to disk. so roughly:
> >>
> >> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
> >>
> >> i have this setting: spark.sql.shuffle.partitions=2048
> >>
> >> i expect to see 2048 partitions in shuffle. what i am seeing instead is
> a shuffle with only 100 partitions. it's like the coalesce has taken over
> the partitioning of the groupBy.
> >>
> >> any idea why?
> >>
> >> i am doing coalesce because it is not helpful to write out 2048 files,
> lots of files leads to pressure down the line on executors reading this
> data (i am writing to just one partition of a larger dataset), and since i
> have less than 100 executors i expect it to be efficient. so sounds like a
> good idea, no?
> >>
> >> but i do need 2048 partitions in my shuffle due to the operation i am
> doing in the groupBy (in my real problem i am not just doing a count...).
> >>
> >> thanks!
> >> koert
> >>
> >
>
>
> --
> Sent from my iPhone
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Vadim Semenov
`coalesce` sets the number of partitions for the last stage, so you
have to use `repartition` instead which is going to introduce an extra
shuffle stage

On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>
> one small correction: lots of files leads to pressure on the spark driver 
> program when reading this data in spark.
>
> On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:
>>
>> hi,
>>
>> i am reading data from files into a dataframe, then doing a groupBy for a 
>> given column with a count, and finally i coalesce to a smaller number of 
>> partitions before writing out to disk. so roughly:
>>
>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>
>> i have this setting: spark.sql.shuffle.partitions=2048
>>
>> i expect to see 2048 partitions in shuffle. what i am seeing instead is a 
>> shuffle with only 100 partitions. it's like the coalesce has taken over the 
>> partitioning of the groupBy.
>>
>> any idea why?
>>
>> i am doing coalesce because it is not helpful to write out 2048 files, lots 
>> of files leads to pressure down the line on executors reading this data (i 
>> am writing to just one partition of a larger dataset), and since i have less 
>> than 100 executors i expect it to be efficient. so sounds like a good idea, 
>> no?
>>
>> but i do need 2048 partitions in my shuffle due to the operation i am doing 
>> in the groupBy (in my real problem i am not just doing a count...).
>>
>> thanks!
>> koert
>>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
one small correction: lots of files leads to pressure on the spark driver
program when reading this data in spark.

On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:

> hi,
>
> i am reading data from files into a dataframe, then doing a groupBy for a
> given column with a count, and finally i coalesce to a smaller number of
> partitions before writing out to disk. so roughly:
>
> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
>
> i have this setting: spark.sql.shuffle.partitions=2048
>
> i expect to see 2048 partitions in shuffle. what i am seeing instead is a
> shuffle with only 100 partitions. it's like the coalesce has taken over the
> partitioning of the groupBy.
>
> any idea why?
>
> i am doing coalesce because it is not helpful to write out 2048 files,
> lots of files leads to pressure down the line on executors reading this
> data (i am writing to just one partition of a larger dataset), and since i
> have less than 100 executors i expect it to be efficient. so sounds like a
> good idea, no?
>
> but i do need 2048 partitions in my shuffle due to the operation i am
> doing in the groupBy (in my real problem i am not just doing a count...).
>
> thanks!
> koert
>
>


groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
hi,

i am reading data from files into a dataframe, then doing a groupBy for a
given column with a count, and finally i coalesce to a smaller number of
partitions before writing out to disk. so roughly:

spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)

i have this setting: spark.sql.shuffle.partitions=2048

i expect to see 2048 partitions in shuffle. what i am seeing instead is a
shuffle with only 100 partitions. it's like the coalesce has taken over the
partitioning of the groupBy.

any idea why?

i am doing coalesce because it is not helpful to write out 2048 files, lots
of files leads to pressure down the line on executors reading this data (i
am writing to just one partition of a larger dataset), and since i have
less than 100 executors i expect it to be efficient. so sounds like a good
idea, no?

but i do need 2048 partitions in my shuffle due to the operation i am doing
in the groupBy (in my real problem i am not just doing a count...).

thanks!
koert


Data source jdbc does not support streamed reading

2018-08-08 Thread James Starks
Now my spark job can perform sql operations against database table. Next I want 
to combine  that with streaming context, so switching to readStream() function. 
But after job submission, spark throws

Exception in thread "main" java.lang.UnsupportedOperationException: Data 
source jdbc does not support streamed reading

That looks like sparkSession.readSteam.format("jdbc")... jdbc doesn't support 
streaming

val sparkSession = SparkSession.builder().appName("my-test").getOrCreate()
import session.implicits._
val df = sparkSession.readStream.format("jdbc")...load()
// other operations against df

Checking the example - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

Also searching on the internet, I don't see any examples that close to my need. 
Any pointers or docs that may talk about this or code snippet that may 
illustrate such purpose?

Thanks

Run/install tensorframes on zeppelin pyspark

2018-08-08 Thread Spico Florin
Hi!

I would like to use tensorframes in my pyspark notebook.

I have performed the following:

1. In the spark intepreter adde a new repository
http://dl.bintray.com/spark-packages/maven
2. in the spark interpreter added the
dependency databricks:tensorframes:0.2.9-s_2.11
3. pip install tensorframes


In both 0.7.3 and 0.8.0:
1.  the following code resulted in error: "ImportError: No module named
tensorframes"

%pyspark
import tensorframes as tfs

2. the following code succeeded
%spark
import org.tensorframes.{dsl => tf}
import org.tensorframes.dsl.Implicits._
val df = spark.createDataFrame(Seq(1.0->1.1, 2.0->2.2)).toDF("a", "b")

// As in Python, scoping is recommended to prevent name collisions.
val df2 = tf.withGraph {
val a = df.block("a")
// Unlike python, the scala syntax is more flexible:
val out = a + 3.0 named "out"
// The 'mapBlocks' method is added using implicits to dataframes.
df.mapBlocks(out).select("a", "out")
}

// The transform is all lazy at this point, let's execute it with collect:
df2.collect()

I ran the code above directly with spark interpreter with the default
configurations (master set up to local[*] - so not via spark-submit
command) .

Also, I have installed spark home locally and ran the command

$SPARK_HOME/bin/pyspark --packages databricks:tensorframes:0.2.9-s_2.11

and the code below worked as expcted

import tensorframes as tfs

 Can you please help to solve this?

Thanks,

 Florin


Re: Replacing groupBykey() with reduceByKey()

2018-08-08 Thread Biplob Biswas
Hi Santhosh,

My name is not Bipin, its Biplob as is clear from my Signature.

Regarding your question, I have no clue what your map operation is doing on
the grouped data, so I can only suggest you to do :

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).reduceByKey(build_edges, 25)

Although based on the return type you would have to modify your build_edges
function.

Thanks & Regards
Biplob Biswas


On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB  wrote:

> Hey Bipin,
> Thanks for the reply, I am actually aggregating after the groupByKey() 
> operation,
> I have posted the wrong code snippet in my first email. Here is what I am
> doing
>
> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
> (x[0],x)).groupByKey(25).map(build_edges)
>
> Can we replace reduceByKey() in this context ?
>
> Santhosh
>
>
> On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas 
> wrote:
>
>> Hi Santhosh,
>>
>> If you are not performing any aggregation, then I don't think you can
>> replace your groupbykey with a reducebykey, and as I see you are only
>> grouping and taking 2 values of the result, thus I believe you can't just
>> replace your groupbykey with that.
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:
>>
>>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
>>> and python newbie and I am having a hard time figuring out the lambda
>>> function for the reduceByKey() operation.
>>>
>>> Here is the code
>>>
>>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
>>> (x[0],x)).groupByKey(25).take(2)
>>>
>>> Here is the return value
>>>
>>> >>> dd[(u'KEY_1', >> >>> 0x107be0c50>), (u'KEY_2', >> >>> at 0x107be0c10>)]
>>>
>>> and Here are the iterable contents dd[0][1]
>>>
>>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
>>> value=u'e7dc1f2a')Row(key=u'KEY_1', 
>>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
>>> value=u'fb0bc953')...Row(key=u'KEY_1', 
>>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
>>> value=u'd39714d3')Row(key=u'KEY_1', 
>>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>>>
>>> My question is how do replace with reduceByKey() and get the same
>>> output as above?
>>>
>>> Santhosh
>>>
>>
>


Re: need workaround around HIVE-11625 / DISTRO-800

2018-08-08 Thread Pranav Agrawal
any help please

On Tue, Aug 7, 2018 at 1:49 PM, Pranav Agrawal 
wrote:

> I am hitting issue,
> https://issues.cloudera.org/browse/DISTRO-800 (related to
> https://issues.apache.org/jira/browse/HIVE-11625)
>
> I am unable to write empty array of types int or string (array of size 0)
> into parquet, please assist or suggest workaround for the same.
>
> spark version: 2.2.1
> AWS EMR: 5.12, 5.13
>


Re: Split a row into multiple rows Java

2018-08-08 Thread Manu Zhang
The following may help although in Scala. The idea is to firstly concat
each value with time, assembly all time_value into an array and explode,
and finally split time_value into time and value.

 val ndf = df.select(col("name"), col("otherName"),
explode(
  array(concat_ws(":", col("v1"), lit("v1")).alias("v1"),
concat_ws(":", col("v2"), lit("v2")).alias("v2"),
concat_ws(":", col("v3"), lit("v3")).alias("v3"))
).alias("temp"))

  val fields = split(col("temp"), ":")
  ndf.select(col("name"), col("otherName"),
fields.getItem(1).alias("time"),
fields.getItem(0).alias("value"))

Regards,
Manu Zhang

On Wed, Aug 8, 2018 at 11:41 AM nookala  wrote:

> +-+-++++
> | name|otherName|val1|val2|val3|
> +-+-++++
> |  bob|   b1|   1|   2|   3|
> |alive|   c1|   3|   4|   6|
> |  eve|   e1|   7|   8|   9|
> +-+-++++
>
> I need this to become
>
> +-+-++-
> | name|otherName|time|value
> +-+-++-
> |  bob|   b1|   val1|1
> |  bob|   b1|   val2|2
> |  bob|   b1|   val3|3
> |alive|   c1|   val1| 3
> |alive|   c1|   val2| 4
> |alive|   c1|   val3| 6
> |  eve|   e1|   val1|7
> |  eve|   e1|   val2|8
> |  eve|   e1|   val3|9
> +-+-++-
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>