Re: Memory problems with simple ETL in Pyspark

2017-04-15 Thread ayan guha
What i missed is try increasing number of partitions using repartition
On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote:

> It does not look like scala vs python thing. How big is your audience data
> store? Can it be broadcasted?
>
> What is the memory footprint you are seeing? At what point yarn is
> killing? Depeneding on that you may want to tweak around number of
> partitions of input dataset and increase number of executors
>
> Ayan
>
>
> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com>
> wrote:
>
>> Hello,
>>
>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>> prepares it for SparkML. I don't speak Scala so I've been trying to
>> implement in PySpark on YARN, Spark 2.1.
>>
>> Despite the transformations being fairly simple, the job always fails by
>> running out of executor memory.
>>
>> The input table is long (~6bn rows) but composed of three simple values:
>>
>> #
>> all_data_long.printSchema()
>>
>> root
>> |-- id: long (nullable = true)
>> |-- label: short (nullable = true)
>> |-- segment: string (nullable = true)
>>
>> #
>>
>> First I join it to a table of particular segments of interests and do an
>> aggregation,
>>
>> #
>>
>> audiences.printSchema()
>>
>> root
>>  |-- entry: integer (nullable = true)
>>  |-- descr: string (nullable = true)
>>
>>
>> print("Num in adl: {}".format(str(all_data_long.count(
>>
>> aud_str = audiences.select(audiences['entry'].cast('string'),
>> audiences['descr'])
>>
>> alldata_aud = all_data_long.join(aud_str,
>> all_data_long['segment']==aud_str['entry'],
>> 'left_outer')
>>
>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>
>> idx_df   = str_idx.fit(alldata_aud)
>> label_df =
>> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>>
>> id_seg = (label_df
>> .filter(label_df.descr.isNotNull())
>> .groupBy('id')
>> .agg(collect_list('descr')))
>>
>> id_seg.write.saveAsTable("hive.id_seg")
>>
>> #
>>
>> Then, I use that StringIndexer again on the first data frame to featurize
>> the segment ID
>>
>> #
>>
>> alldat_idx =
>> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>>
>> #
>>
>>
>> My ultimate goal is to make a SparseVector, so I group the indexed
>> segments by id and try to cast it into a vector
>>
>> #
>>
>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
>> for v in l}),VectorUDT())
>>
>> alldat_idx.cache()
>>
>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>
>> print("alldat_dix: {}".format(str(alldat_idx.count(
>>
>> feature_df = (alldat_idx
>> .withColumn('label',alldat_idx['label_val'].cast('double'))
>> .groupBy('id','label')
>>
>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>> .withColumn('num_feat',lit(feature_vec_len))
>>
>> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>> .drop('collect_list_is')
>> .drop('num_feat'))
>>
>> feature_df.cache()
>> print("Num in featuredf: {}".format(str(feature_df.count(  ## <-
>> failure occurs here
>>
>> #
>>
>> Here, however, I always run out of memory on the executors (I've twiddled
>> driver and executor memory to check) and YARN kills off my containers. I've
>> gone as high as —executor-memory 15g but it still doesn't help.
>>
>> Given the number of segments is at most 50,000 I'm surprised that a
>> smallish row-wise operation is enough to blow up the process.
>>
>>
>> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>>
>>
>>
>>
>>
>> Query p

Re: Memory problems with simple ETL in Pyspark

2017-04-15 Thread ayan guha
ect_list_is#197]
>+- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>   +- Project [id#0L, label#1 AS label_val#99, segment#2,
> indexedSegs#93]
>  +- Project [id#0L, label#1, segment#2,
> UDF(cast(segment#2 as string)) AS indexedSegs#93]
> +- MetastoreRelation pmccarthy, all_data_long
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#265L]
> +- Project [id#0L, label#183, features#208]
>+- Project [id#0L, label#183, num_feat#202, features#208]
>   +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
> (collect_list_is#197, num_feat#202) AS features#208]
>  +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
> num_feat#202]
> +- Aggregate [id#0L, label#183], [id#0L, label#183,
> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>+- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>   +- Project [id#0L, label#1 AS label_val#99, segment#2,
> indexedSegs#93]
>  +- Project [id#0L, label#1, segment#2,
> UDF(cast(segment#2 as string)) AS indexedSegs#93]
> +- MetastoreRelation pmccarthy, all_data_long
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#265L]
> +- Project
>+- InMemoryRelation [id#0L, label#183, features#208], true, 1,
> StorageLevel(disk, memory, deserialized, 1 replicas)
>  +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
> +- BatchEvalPython [(collect_list_is#197, 56845.0)],
> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>+- SortAggregate(key=[id#0L, label#183],
> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
> collect_list_is#197])
>   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
> FIRST], false, 0
>  +- Exchange hashpartitioning(id#0L, label#183, 200)
> +- *Project [id#0L, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>+- InMemoryTableScan [id#0L, indexedSegs#93,
> label_val#99]
>  +- InMemoryRelation [id#0L, label_val#99,
> segment#2, indexedSegs#93], true, 1, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>+- *Project [id#0L, label#1 AS
> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>   +- HiveTableScan [id#0L,
> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#284L])
>   +- InMemoryTableScan
> +- InMemoryRelation [id#0L, label#183, features#208], true,
> 1, StorageLevel(disk, memory, deserialized, 1 replicas)
>   +- *Project [id#0L, label#183, pythonUDF0#244 AS
> features#208]
>  +- BatchEvalPython [(collect_list_is#197,
> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
> +- SortAggregate(key=[id#0L, label#183],
> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
> collect_list_is#197])
>+- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
> NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(id#0L,
> label#183, 200)
>  +- *Project [id#0L, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
> +- InMemoryTableScan [id#0L,
> indexedSegs#93, label_val#99]
>   +- InMemoryRelation [id#0L,
> label_val#99, segment#2, indexedSegs#93], true, 1, StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- *Project [id#0L,
> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>+- HiveTableScan
> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>
>
> --
Best Regards,
Ayan Guha


Re: checkpoint

2017-04-13 Thread ayan guha
Looks like your udf expects numeric data but you are sending string type.
Suggest to cast to numeric.

On Thu, 13 Apr 2017 at 7:03 pm, issues solution <issues.solut...@gmail.com>
wrote:

> Hi
> I am newer in spark and i want ask you what wrang with checkpoint  On
> pyspark 1.6.0
>
> i dont unertsand what happen after i try to use it under datframe :
>dfTotaleNormalize24 =  dfTotaleNormalize23.select([i if i not in
> listrapcot  else  udf_Grappra(F.col(i)).alias(i) for i in
> dfTotaleNormalize23.columns  ])
>
> dfTotaleNormalize24.cache()   <- cache on memory
> dfTotaleNormalize24.count <-matrialize dataframe(  rdd too ??)
> dfTotaleNormalize24.rdd.checkpoint() <- (cut DAG and save rdd not yet)
> dfTotaleNormalize24.rdd.count() <--- matrialize in file
>
> but why i get the following error :
>
>  java.lang.UnsupportedOperationException: Cannot evaluate expression:
>  PythonUDF#Grappra(input[410, StringType])
>
>
> thank's to explain all details and steps to save and check point
>
> Mydatframe it huge on with more than 5 Million rows and 1000 columns
>
> and udf befor are applied on more than 150 columns  it replace  ' ' by 0.0
> that all.
>
> regards
>
-- 
Best Regards,
Ayan Guha


Re: is there a way to persist the lineages generated by spark?

2017-04-03 Thread ayan guha
How about storing logical plans (or printDebugString, in case of RDD) to an
external file on the driver?

On Tue, Apr 4, 2017 at 1:19 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I am wondering if there a way to persist the lineages generated by spark
> underneath? Some of our clients want us to prove if the result of the
> computation that we are showing on a dashboard is correct and for that If
> we can show the lineage of transformations that are executed to get to the
> result then that can be the Q.E.D moment but I am not even sure if this is
> even possible with spark?
>
> Thanks,
> kant
>



-- 
Best Regards,
Ayan Guha


Re: Spark SQL 2.1 Complex SQL - Query Planning Issue

2017-03-30 Thread ayan guha
I think there is an option of pinning execution plans in memory to avoid
such scenarios

On Fri, Mar 31, 2017 at 1:25 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hi Everyone,
>
> I have complex SQL with approx 2000 lines of code and works with 50+
> tables with 50+ left joins and transformations. All the tables are fully
> cached in Memory with sufficient storage memory and working memory. The
> issue is after the launch of the query for the execution; the query takes
> approximately 40 seconds to appear in the Jobs/SQL in the application UI.
>
> While the execution takes only 25 seconds; the execution is delayed by 40
> seconds by the scheduler so the total runtime of the query becomes 65
> seconds(40s + 25s). Also, there are enough cores available during this wait
> time. I couldn't figure out why DAG scheduler is delaying the execution by
> 40 seconds. Is this due to time taken for Query Parsing and Query Planning
> for the Complex SQL? If thats the case; how do we optimize this Query
> Parsing and Query Planning time in Spark? Any help would be helpful.
>
>
> Thanks
>
> Sathish
>



-- 
Best Regards,
Ayan Guha


Re: Need help for RDD/DF transformation.

2017-03-30 Thread ayan guha
Is it possible for one key in 2 groups in rdd2?

[1,2,3]
[1,4,5]

?

On Thu, 30 Mar 2017 at 12:23 pm, Mungeol Heo <mungeol@gmail.com> wrote:

> Hello Yong,
>
> First of all, thank your attention.
> Note that the values of elements, which have values at RDD/DF1, in the
> same list will be always same.
> Therefore, the "1" and "3", which from RDD/DF 1, will always have the
> same value which is "a".
>
> The goal here is assigning same value to elements of the list which
> does not exist in RDD/DF 1.
> So, all the elements in the same list can have same value.
>
> Or, the final RDD/DF also can be like this,
>
> [1, 2, 3], a
> [4, 5], b
>
> Thank you again.
>
> - Mungeol
>
>
> On Wed, Mar 29, 2017 at 9:03 PM, Yong Zhang <java8...@hotmail.com> wrote:
> > What is the desired result for
> >
> >
> > RDD/DF 1
> >
> > 1, a
> > 3, c
> > 5, b
> >
> > RDD/DF 2
> >
> > [1, 2, 3]
> > [4, 5]
> >
> >
> > Yong
> >
> > 
> > From: Mungeol Heo <mungeol@gmail.com>
> > Sent: Wednesday, March 29, 2017 5:37 AM
> > To: user@spark.apache.org
> > Subject: Need help for RDD/DF transformation.
> >
> > Hello,
> >
> > Suppose, I have two RDD or data frame like addressed below.
> >
> > RDD/DF 1
> >
> > 1, a
> > 3, a
> > 5, b
> >
> > RDD/DF 2
> >
> > [1, 2, 3]
> > [4, 5]
> >
> > I need to create a new RDD/DF like below from RDD/DF 1 and 2.
> >
> > 1, a
> > 2, a
> > 3, a
> > 4, b
> > 5, b
> >
> > Is there an efficient way to do this?
> > Any help will be great.
> >
> > Thank you.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: calculate diff of value and median in a group

2017-03-22 Thread ayan guha
For median, use percentile_approx with 0.5 (50th percentile is the median)

On Thu, Mar 23, 2017 at 11:01 AM, Yong Zhang <java8...@hotmail.com> wrote:

> He is looking for median, not mean/avg.
>
>
> You have to implement the median logic by yourself, as there is no
> directly implementation from Spark. You can use RDD API, if you are using
> 1.6.x, or dataset if 2.x
>
>
> The following example gives you an idea how to calculate the median using
> dataset API. You can even change the code to add additional logic to
> calculate the diff of every value with the median.
>
>
> scala> spark.version
> res31: String = 2.1.0
>
> scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96),
> (101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)]
> ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double]
>
> scala> ds.show
> +---+-+
> | id|value|
> +---+-+
> |100| 0.43|
> |100| 0.33|
> |100| 0.73|
> |101| 0.29|
> |101| 0.96|
> |101| 0.42|
> |101| 0.01|
> +---+-+
>
> scala> def median(seq: Seq[Double]) = {
>  |   val size = seq.size
>  |   val sorted = seq.sorted
>  |   size match {
>  | case even if size % 2 == 0 => (sorted((size-2)/2) + 
> sorted(size/2)) / 2
>  | case odd => sorted((size-1)/2)
>  |   }
>  | }
> median: (seq: Seq[Double])Double
>
> scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, 
> median(iter.map(_._2).toSeq))).show
> +---+-+
> | _1|   _2|
> +---+-+
> |101|0.355|
> |100| 0.43|
> +---+-+
>
>
> Yong
>
>
>
>
> --
> *From:* ayan guha <guha.a...@gmail.com>
> *Sent:* Wednesday, March 22, 2017 7:23 PM
> *To:* Craig Ching
> *Cc:* Yong Zhang; user@spark.apache.org
> *Subject:* Re: calculate diff of value and median in a group
>
> I would suggest use window function with partitioning.
>
> select group1,group2,name,value, avg(value) over (partition group1,group2
> order by name) m
> from t
>
> On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <craigch...@gmail.com> wrote:
>
>> Are the elements count big per group? If not, you can group them and use
>> the code to calculate the median and diff.
>>
>>
>> They're not big, no.  Any pointers on how I might do that?  The part I'm
>> having trouble with is the grouping, I can't seem to see how to do the
>> median per group.  For mean, we have the agg feature, but not for median
>> (and I understand the reasons for that).
>>
>> Yong
>>
>> --
>> *From:* Craig Ching <craigch...@gmail.com>
>> *Sent:* Wednesday, March 22, 2017 3:17 PM
>> *To:* user@spark.apache.org
>> *Subject:* calculate diff of value and median in a group
>>
>> Hi,
>>
>> When using pyspark, I'd like to be able to calculate the difference
>> between grouped values and their median for the group.  Is this possible?
>> Here is some code I hacked up that does what I want except that it
>> calculates the grouped diff from mean.  Also, please feel free to comment
>> on how I could make this better if you feel like being helpful :)
>>
>> from pyspark import SparkContext
>> from pyspark.sql import SparkSession
>> from pyspark.sql.types import (
>> StringType,
>> LongType,
>> DoubleType,
>> StructField,
>> StructType
>> )
>> from pyspark.sql import functions as F
>>
>>
>> sc = SparkContext(appName='myapp')
>> spark = SparkSession(sc)
>>
>> file_name = 'data.csv'
>>
>> fields = [
>> StructField(
>> 'group2',
>> LongType(),
>> True),
>> StructField(
>> 'name',
>> StringType(),
>> True),
>> StructField(
>> 'value',
>> DoubleType(),
>> True),
>> StructField(
>> 'group1',
>> LongType(),
>> True)
>> ]
>> schema = StructType(fields)
>>
>> df = spark.read.csv(
>> file_name, header=False, mode="DROPMALFORMED", schema=schema
>> )
>> df.show()
>> means = df.select([
>> 'group1',
>> 'group2',
>> 'name',
>> 'value']).groupBy([
>> 'group1',
>> 'group2'
>> ]).agg(
>> F.mean('value').alias('mean_value')
>> ).orderBy('group1', 'group2')
>>
>> cond = [df.group1 == means.group1, df.group2 == means.group2]
>>
>> means.show()
&g

Re: calculate diff of value and median in a group

2017-03-22 Thread ayan guha
I would suggest use window function with partitioning.

select group1,group2,name,value, avg(value) over (partition group1,group2
order by name) m
from t

On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <craigch...@gmail.com> wrote:

> Are the elements count big per group? If not, you can group them and use
> the code to calculate the median and diff.
>
>
> They're not big, no.  Any pointers on how I might do that?  The part I'm
> having trouble with is the grouping, I can't seem to see how to do the
> median per group.  For mean, we have the agg feature, but not for median
> (and I understand the reasons for that).
>
> Yong
>
> --
> *From:* Craig Ching <craigch...@gmail.com>
> *Sent:* Wednesday, March 22, 2017 3:17 PM
> *To:* user@spark.apache.org
> *Subject:* calculate diff of value and median in a group
>
> Hi,
>
> When using pyspark, I'd like to be able to calculate the difference
> between grouped values and their median for the group.  Is this possible?
> Here is some code I hacked up that does what I want except that it
> calculates the grouped diff from mean.  Also, please feel free to comment
> on how I could make this better if you feel like being helpful :)
>
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import (
> StringType,
> LongType,
> DoubleType,
> StructField,
> StructType
> )
> from pyspark.sql import functions as F
>
>
> sc = SparkContext(appName='myapp')
> spark = SparkSession(sc)
>
> file_name = 'data.csv'
>
> fields = [
> StructField(
> 'group2',
> LongType(),
> True),
> StructField(
> 'name',
> StringType(),
> True),
> StructField(
> 'value',
> DoubleType(),
> True),
> StructField(
> 'group1',
> LongType(),
> True)
> ]
> schema = StructType(fields)
>
> df = spark.read.csv(
> file_name, header=False, mode="DROPMALFORMED", schema=schema
> )
> df.show()
> means = df.select([
> 'group1',
> 'group2',
> 'name',
> 'value']).groupBy([
> 'group1',
> 'group2'
> ]).agg(
> F.mean('value').alias('mean_value')
> ).orderBy('group1', 'group2')
>
> cond = [df.group1 == means.group1, df.group2 == means.group2]
>
> means.show()
> df = df.select([
> 'group1',
> 'group2',
> 'name',
> 'value']).join(
> means,
> cond
> ).drop(
> df.group1
> ).drop(
> df.group2
> ).select('group1',
>  'group2',
>  'name',
>  'value',
>  'mean_value')
>
> final = df.withColumn(
> 'diff',
> F.abs(df.value - df.mean_value))
> final.show()
>
> sc.stop()
>
> And here is an example dataset I'm playing with:
>
> 100,name1,0.43,0
> 100,name2,0.33,0
> 100,name3,0.73,0
> 101,name1,0.29,0
> 101,name2,0.96,0
> 101,name3,0.42,0
> 102,name1,0.01,0
> 102,name2,0.42,0
> 102,name3,0.51,0
> 103,name1,0.55,0
> 103,name2,0.45,0
> 103,name3,0.02,0
> 104,name1,0.93,0
> 104,name2,0.16,0
> 104,name3,0.74,0
> 105,name1,0.41,0
> 105,name2,0.65,0
> 105,name3,0.29,0
> 100,name1,0.51,1
> 100,name2,0.51,1
> 100,name3,0.43,1
> 101,name1,0.59,1
> 101,name2,0.55,1
> 101,name3,0.84,1
> 102,name1,0.01,1
> 102,name2,0.98,1
> 102,name3,0.44,1
> 103,name1,0.47,1
> 103,name2,0.16,1
> 103,name3,0.02,1
> 104,name1,0.83,1
> 104,name2,0.89,1
> 104,name3,0.31,1
> 105,name1,0.59,1
> 105,name2,0.77,1
> 105,name3,0.45,1
>
> and here is what I'm trying to produce:
>
> group1,group2,name,value,median,diff
> 0,100,name1,0.43,0.43,0.0
> 0,100,name2,0.33,0.43,0.10
> 0,100,name3,0.73,0.43,0.30
> 0,101,name1,0.29,0.42,0.13
> 0,101,name2,0.96,0.42,0.54
> 0,101,name3,0.42,0.42,0.0
> 0,102,name1,0.01,0.42,0.41
> 0,102,name2,0.42,0.42,0.0
> 0,102,name3,0.51,0.42,0.09
> 0,103,name1,0.55,0.45,0.10
> 0,103,name2,0.45,0.45,0.0
> 0,103,name3,0.02,0.45,0.43
> 0,104,name1,0.93,0.74,0.19
> 0,104,name2,0.16,0.74,0.58
> 0,104,name3,0.74,0.74,0.0
> 0,105,name1,0.41,0.41,0.0
> 0,105,name2,0.65,0.41,0.24
> 0,105,name3,0.29,0.41,0.24
> 1,100,name1,0.51,0.51,0.0
> 1,100,name2,0.51,0.51,0.0
> 1,100,name3,0.43,0.51,0.08
> 1,101,name1,0.59,0.59,0.0
> 1,101,name2,0.55,0.59,0.04
> 1,101,name3,0.84,0.59,0.25
> 1,102,name1,0.01,0.44,0.43
> 1,102,name2,0.98,0.44,0.54
> 1,102,name3,0.44,0.44,0.0
> 1,103,name1,0.47,0.16,0.31
> 1,103,name2,0.16,0.16,0.0
> 1,103,name3,0.02,0.16,0.14
> 1,104,name1,0.83,0.83,0.0
> 1,104,name2,0.89,0.83,0.06
> 1,104,name3,0.31,0.83,0.52
> 1,105,name1,0.59,0.59,0.0
> 1,105,name2,0.77,0.59,0.18
> 1,105,name3,0.45,0.59,0.14
>
> Thanks for any help!
>
> Cheers,
> Craig
>
>


-- 
Best Regards,
Ayan Guha


Re: Local spark context on an executor

2017-03-21 Thread ayan guha
For JDBC to work, you can start spark-submit with appropriate jdbc driver
jars (using --jars), then you will have the driver available on executors.

For acquiring connections, create a singleton connection per executor. I
think dataframe's jdbc reader (sqlContext.read.jdbc) already take care of
it.

Finally, if you want multiple mysql table to be accesses in a single spark
job, you can create a list of tables and run a map on that list. Something
like:

def getTable(tablename:String): Dataframe
def saveTable(d : Dataframe): Unit

val tables = sc.paralleize()
tables.map(getTable).map(saveTable)

On Wed, Mar 22, 2017 at 9:41 AM, Shashank Mandil <mandil.shash...@gmail.com>
wrote:

> I am using spark to dump data from mysql into hdfs.
> The way I am doing this is by creating a spark dataframe with the metadata
> of different mysql tables to dump from multiple mysql hosts and then
> running a map over that data frame to dump each mysql table data into hdfs
> inside the executor.
>
> The reason I want spark context is that I would like to use spark jdbc to
> be able to read the mysql table and then the spark writer to be able to
> write to hdfs.
>
> Thanks,
> Shashank
>
> On Tue, Mar 21, 2017 at 3:37 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> What is your use case? I am sure there must be a better way to solve
>> it
>>
>> On Wed, Mar 22, 2017 at 9:34 AM, Shashank Mandil <
>> mandil.shash...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am using spark in a yarn cluster mode.
>>> When I run a yarn application it creates multiple executors on the
>>> hadoop datanodes for processing.
>>>
>>> Is it possible for me to create a local spark context (master=local) on
>>> these executors to be able to get a spark context ?
>>>
>>> Theoretically since each executor is a java process this should be
>>> doable isn't it ?
>>>
>>> Thanks,
>>> Shashank
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Local spark context on an executor

2017-03-21 Thread ayan guha
What is your use case? I am sure there must be a better way to solve it

On Wed, Mar 22, 2017 at 9:34 AM, Shashank Mandil <mandil.shash...@gmail.com>
wrote:

> Hi All,
>
> I am using spark in a yarn cluster mode.
> When I run a yarn application it creates multiple executors on the hadoop
> datanodes for processing.
>
> Is it possible for me to create a local spark context (master=local) on
> these executors to be able to get a spark context ?
>
> Theoretically since each executor is a java process this should be doable
> isn't it ?
>
> Thanks,
> Shashank
>



-- 
Best Regards,
Ayan Guha


Re: spark-sql use case beginner question

2017-03-08 Thread ayan guha
Hi

Subject to your version of Hive & Spark, you may want to set
hive.execution.engine=spark as beeline command line parameter, assuming you
are running hive scripts using beeline command line (which is suggested
practice for security purposes).



On Thu, Mar 9, 2017 at 2:09 PM, nancy henry <nancyhenry6...@gmail.com>
wrote:

>
> Hi Team,
>
> basically we have all data as hive tables ..and processing it till now in
> hive on MR.. now that we have hivecontext which can run hivequeries on
> spark, we are making all these complex hive scripts to run using
> hivecontext.sql(sc.textfile(hivescript)) kind of approach ie basically
> running hive queries on spark and not coding anything yet in scala still we
> see just making hive queries to run on spark is showing a lot difference in
> time than run on MR..
>
> so as we already have hivescripts lets make those complex hivescript run
> using hc.sql as hc.sql is able to do it
>
> or is this not best practice even though spark can do it its still better
> to load all those individual hive tables in spark and make rdds and write
> scala code to get the same functionality happening in hive
>
> its becoming difficult for us to choose whether to leave it to hc.sql to
> do the work of running complex scripts also or we have to code in
> scala..will it be worth the effort of manual intervention in terms of
> performance
>
> ex of our sample scripts
> use db;
> create tempfunction1 as com.fgh.jkl.TestFunction;
>
> create destable in hive;
> insert overwrite desttable select (big complext transformations and usage
> of hive udf)
> from table1,table2,table3 join table4 on some condition complex and join
> table 7 on another complex condition where complex filtering
>
> So please help what would be best approach and why i should not give
> entire script for hivecontext to make its own rdds and run on spark if we
> are able to do it
>
> coz all examples i see online are only showing hc.sql("select * from
> table1) and nothing complex than that
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Failed to connect to master ...

2017-03-07 Thread ayan guha
You need to start Master and worker processes before connecting to them.

On Wed, Mar 8, 2017 at 3:33 PM, Mina Aslani <aslanim...@gmail.com> wrote:

> Hi,
>
> I am writing a spark Transformer in intelliJ in Java and trying to connect
> to the spark in a VM using setMaster. I get "Failed to connect to master
> ..."
>
> I get 17/03/07 16:20:55 WARN StandaloneAppClient$ClientEndpoint: Failed
> to connect to master VM_IPAddress:7077
> org.apache.spark.SparkException: Exception thrown in awaitResult
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:77)
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:75)
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> SparkSession spark = SparkSession
>   .builder()
>   .appName("Java Spark SQL")
>   //.master("local[1]")
>   .master("spark://VM_IPAddress:7077")
>   .getOrCreate();
>
> Dataset lines = spark
>   .readStream()
>   .format("kafka")  .option("kafka.bootstrap.servers", brokers)  
> .option("subscribe", topic)  .load()
>   .selectExpr("CAST(value AS STRING)")  .as(Encoders.STRING());
>
>
>
> I get same error when I try master("*spark://spark-master:7077**"*).
>
> *However, .master("local[1]") *no exception is thrown*.*
> *
> My Kafka is in the same VM and being new to SPARK still trying to understand:
> *
>
> - Why I get above exception and how I can fix it (connect to SPARK in VM and 
> read form KAfKA in VM)?
>
> - Why using "local[1]" no exception is thrown and how to setup to read from 
> kafka in VM?
>
> *- How to stream from Kafka (data in the topic is in json format)?
> *
> Your input is appreciated!
>
> Best regards,
> Mina
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: finding Spark Master

2017-03-07 Thread ayan guha
 yarn-client or yarn-cluster

On Wed, 8 Mar 2017 at 10:28 am, Adaryl Wakefield <
adaryl.wakefi...@hotmail.com> wrote:

> I’m running a three node cluster along with Spark along with Hadoop as
> part of a HDP stack. How do I find my Spark Master? I’m just seeing the
> clients. I’m trying to figure out what goes in setMaster() aside from
> local[*].
>
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
>
> www.massstreet.net
>
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData
>
>
>
-- 
Best Regards,
Ayan Guha


Re: Spark Beginner: Correct approach for use case

2017-03-05 Thread ayan guha
Any specific reason to choose Spark? It sounds like you have a
Write-Once-Read-Many Times dataset, which is logically partitioned across
customers, sitting in some data store. And essentially you are looking for
a fast way to access it, and most likely you will use the same partition
key for quering the data. This is more of a database/NoSQL kind of use case
than Spark (which is more of distributed processing engine,I reckon).

On Mon, Mar 6, 2017 at 11:56 AM, Subhash Sriram <subhash.sri...@gmail.com>
wrote:

> Hi Allan,
>
> Where is the data stored right now? If it's in a relational database, and
> you are using Spark with Hadoop, I feel like it would make sense to move
> the import the data into HDFS, just because it would be faster to access
> the data. You could use Sqoop to do that.
>
> In terms of having a long running Spark context, you could look into the
> Spark job server:
>
> https://github.com/spark-jobserver/spark-jobserver/blob/master/README.md
>
> It would allow you to cache all the data in memory and then accept queries
> via REST API calls. You would have to refresh your cache as the data
> changes of course, but it sounds like that is not very often.
>
> In terms of running the queries themselves, I would think you could use
> Spark SQL and the DataFrame/DataSet API, which is built into Spark. You
> will have to think about the best way to partition your data, depending on
> the queries themselves.
>
> Here is a link to the Spark SQL docs:
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html
>
> I hope that helps, and I'm sure other folks will have some helpful advice
> as well.
>
> Thanks,
> Subhash
>
> Sent from my iPhone
>
> On Mar 5, 2017, at 3:49 PM, Allan Richards <allan.richa...@gmail.com>
> wrote:
>
> Hi,
>
> I am looking to use Spark to help execute queries against a reasonably
> large dataset (1 billion rows). I'm a bit lost with all the different
> libraries / add ons to Spark, and am looking for some direction as to what
> I should look at / what may be helpful.
>
> A couple of relevant points:
>  - The dataset doesn't change over time.
>  - There are a small number of applications (or queries I guess, but it's
> more complicated than a single SQL query) that I want to run against it,
> but the parameters to those queries will change all the time.
>  - There is a logical grouping of the data per customer, which will
> generally consist of 1-5000 rows.
>
> I want each query to run as fast as possible (less than a second or two).
> So ideally I want to keep all the records in memory, but distributed over
> the different nodes in the cluster. Does this mean sharing a SparkContext
> between queries, or is this where HDFS comes in, or is there something else
> that would be better suited?
>
> Or is there another overall approach I should look into for executing
> queries in "real time" against a dataset this size?
>
> Thanks,
> Allan.
>
>


-- 
Best Regards,
Ayan Guha


Re: [RDDs and Dataframes] Equivalent expressions for RDD API

2017-03-05 Thread ayan guha
Just as best practice, dataframe and datasets are preferred way, so try not
to resort to rdd unless you absolutely have to...

On Sun, 5 Mar 2017 at 7:10 pm, khwunchai jaengsawang <khwuncha...@ku.th>
wrote:

> Hi Old-Scool,
>
>
> For the first question, you can specify the number of partition in any
> DataFrame by using
> repartition(numPartitions: Int, partitionExprs: Column*).
> *Example*:
> val partitioned = data.repartition(numPartitions=10).cache()
>
> For your second question, you can transform your RDD into PairRDD and use
> reduceByKey()
> *Example:*
> val pairs = data.map(row => (row(1), row(2)).reduceByKey(_+_)
>
>
> Best,
>
>   Khwunchai Jaengsawang
>   *Email*: khwuncha...@ku.th
>   LinkedIn <https://linkedin.com/in/khwunchai> | Github
> <https://github.com/khwunchai>
>
>
> On Mar 4, 2560 BE, at 8:59 PM, Old-School <giorgos_myrianth...@outlook.com>
> wrote:
>
> Hi,
>
> I want to perform some simple transformations and check the execution time,
> under various configurations (e.g. number of cores being used, number of
> partitions etc). Since it is not possible to set the partitions of a
> dataframe , I guess that I should probably use RDDs.
>
> I've got a dataset with 3 columns as shown below:
>
> val data = file.map(line => line.split(" "))
>  .filter(lines => lines.length == 3) // ignore first line
>  .map(row => (row(0), row(1), row(2)))
>  .toDF("ID", "word-ID", "count")
> results in:
>
> +--++-+
> | ID |  word-ID   |  count   |
> +--++-+
> |  15   |87  |   151|
> |  20   |19  |   398|
> |  15   |19  |   21  |
> |  180 |90  |   190|
> +---+-+
> So how can I turn the above into an RDD in order to use e.g.
> sc.parallelize(data, 10) and set the number of partitions to say 10?
>
> Furthermore, I would also like to ask about the equivalent expression
> (using
> RDD API) for the following simple transformation:
>
> data.select("word-ID",
> "count").groupBy("word-ID").agg(sum($"count").as("count")).show()
>
>
>
> Thanks in advance
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-and-Dataframes-Equivalent-expressions-for-RDD-API-tp28455.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> --
Best Regards,
Ayan Guha


Re: Spark 2.0 issue with left_outer join

2017-03-04 Thread ayan guha
How about running this -

select * from
(select * , count() over (partition by id order by id) c from filteredDS) f
where f.cnt < 7500


On Sun, Mar 5, 2017 at 12:05 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Yes every time I run this code with production scale data it fails. Test
> case with small dataset of 50 records on local box runs fine.
>
> Thanks
> Ankur
>
> Sent from my iPhone
>
> On Mar 4, 2017, at 12:09 PM, ayan guha <guha.a...@gmail.com> wrote:
>
> Just to be sure, can you reproduce the error using sql api?
>
> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Adding DEV.
>>
>> Or is there any other way to do subtractByKey using Dataset APIs?
>>
>> Thanks
>> Ankur
>>
>> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <
>> ankur.srivast...@gmail.com> wrote:
>>
>> Hi Users,
>>
>> We are facing an issue with left_outer join using Spark Dataset api in
>> 2.0 Java API. Below is the code we have
>>
>> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count()
>> .filter((FilterFunction) row -> (Long) row.getAs("count") > 
>> 75000);
>> _logger.info("Id count with over 75K records that will be filtered: " + 
>> badIds.count());
>>
>> Dataset fiteredRows = filteredDS.join(broadcast(badIds), 
>> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
>> .filter((FilterFunction) row ->  row.getAs("bid") == null)
>> .map((MapFunction<Row, SomeData>) row -> 
>> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class));
>>
>>
>> We get the counts in the log file and then the application fils with
>> below exception
>> Exception in thread "main" java.lang.UnsupportedOperationException: Only
>> code-generated evaluation is supported.
>> at org.apache.spark.sql.catalyst.expressions.objects.Invoke.
>> eval(objects.scala:118)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
>> canFilterOutNull(joins.scala:109)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$7.apply(joins.scala:118)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$7.apply(joins.scala:118)
>> at scala.collection.LinearSeqOptimized$class.
>> exists(LinearSeqOptimized.scala:93)
>> at scala.collection.immutable.List.exists(List.scala:84)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
>> buildNewJoinType(joins.scala:118)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$apply$2.applyOrElse(joins.scala:133)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$apply$2.applyOrElse(joins.scala:131)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
>> apply(TreeNode.scala:279)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
>> apply(TreeNode.scala:279)
>> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
>> withOrigin(TreeNode.scala:69)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:278)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductI

Re: Spark 2.0 issue with left_outer join

2017-03-04 Thread ayan guha
gt; org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:131)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:98)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> at
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> at
> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
> at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
> at test.Driver.main(Driver.java:106)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks
> Ankur
>
>
> --
Best Regards,
Ayan Guha


Re: DataFrame from in memory datasets in multiple JVMs

2017-02-28 Thread ayan guha
How about parallelize and then union all of them to one data frame?

On Wed, 1 Mar 2017 at 3:07 am, Sean Owen <so...@cloudera.com> wrote:

> Broadcasts let you send one copy of read only data to each executor.
> That's not the same as a DataFrame and itseems nature means it doesnt make
> sense to think of them as not distributed. But consider things like
> broadcast hash joins which may be what you are looking for if you really
> mean to join on a small DF efficiently.
>
> On Tue, Feb 28, 2017, 16:03 johndesuv <desu...@gmail.com> wrote:
>
> Hi,
>
> I have an application that runs on a series of JVMs that each contain a
> subset of a large dataset in memory.  I'd like to use this data in spark
> and
> am looking at ways to use this as a data source in spark without writing
> the
> data to disk as a handoff.
>
> Parallelize doesn't work for me since I need to use the data across all the
> JVMs as one DataFrame.
>
> The only option I've come up with so far is to write a custom DataSource
> that then transmits the data from each of the JVMs over the network.  This
> seems like overkill though.
>
> Is there a simpler solution for getting this data into a DataFrame?
>
> Thanks,
> John
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-from-in-memory-datasets-in-multiple-JVMs-tp28438.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: SPark - YARN Cluster Mode

2017-02-27 Thread ayan guha
Hi

Thanks a lot, i used property file to resolve the issue. I think
documentation should mention it though.

On Tue, 28 Feb 2017 at 5:05 am, Marcelo Vanzin <van...@cloudera.com> wrote:

> >  none of my Config settings
>
> Is it none of the configs or just the queue? You can't set the YARN
> queue in cluster mode through code, it has to be set in the command
> line. It's a chicken & egg problem (in cluster mode, the YARN app is
> created before your code runs).
>
>  --property-file works the same as setting options in the command
> line, so you can use that instead.
>
>
> On Sun, Feb 26, 2017 at 4:52 PM, ayan guha <guha.a...@gmail.com> wrote:
> > Hi
> >
> > I am facing an issue with Cluster Mode, with pyspark
> >
> > Here is my code:
> >
> > conf = SparkConf()
> > conf.setAppName("Spark Ingestion")
> > conf.set("spark.yarn.queue","root.Applications")
> > conf.set("spark.executor.instances","50")
> > conf.set("spark.executor.memory","22g")
> > conf.set("spark.yarn.executor.memoryOverhead","4096")
> > conf.set("spark.executor.cores","4")
> > conf.set("spark.sql.hive.convertMetastoreParquet", "false")
> > sc = SparkContext(conf = conf)
> > sqlContext = HiveContext(sc)
> >
> > r = sc.parallelize(xrange(1,1))
> > print r.count()
> >
> > sc.stop()
> >
> > The problem is none of my Config settings are passed on to Yarn.
> >
> > spark-submit --master yarn --deploy-mode cluster ayan_test.py
> >
> > I tried the same code with deploy-mode=client and all config are passing
> > fine.
> >
> > Am I missing something? Will introducing --property-file be of any help?
> Can
> > anybody share some working example?
> >
> > Best
> > Ayan
> >
> > --
> > Best Regards,
> > Ayan Guha
>
>
>
> --
> Marcelo
>
-- 
Best Regards,
Ayan Guha


Re: SPark - YARN Cluster Mode

2017-02-26 Thread ayan guha
Also, I wanted to add if I specify the conf in the command line, it seems
to be working.

For example, if I use

spark-submit --master yarn --deploy-mode cluster --conf
spark.yarn.queue=root.Application ayan_test.py 10

Then it is going to correct queue.

Any help would be great

Best
Ayan

On Mon, Feb 27, 2017 at 11:52 AM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> I am facing an issue with Cluster Mode, with pyspark
>
> Here is my code:
>
> conf = SparkConf()
> conf.setAppName("Spark Ingestion")
> conf.set("spark.yarn.queue","root.Applications")
> conf.set("spark.executor.instances","50")
> conf.set("spark.executor.memory","22g")
> conf.set("spark.yarn.executor.memoryOverhead","4096")
> conf.set("spark.executor.cores","4")
> conf.set("spark.sql.hive.convertMetastoreParquet", "false")
> sc = SparkContext(conf = conf)
> sqlContext = HiveContext(sc)
>
> r = sc.parallelize(xrange(1,1))
> print r.count()
>
> sc.stop()
>
> The problem is none of my Config settings are passed on to Yarn.
>
> spark-submit --master yarn --deploy-mode cluster ayan_test.py
>
> I tried the same code with deploy-mode=client and all config are passing
> fine.
>
> Am I missing something? Will introducing --property-file be of any help?
> Can anybody share some working example?
>
> Best
> Ayan
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


SPark - YARN Cluster Mode

2017-02-26 Thread ayan guha
Hi

I am facing an issue with Cluster Mode, with pyspark

Here is my code:

conf = SparkConf()
conf.setAppName("Spark Ingestion")
conf.set("spark.yarn.queue","root.Applications")
conf.set("spark.executor.instances","50")
conf.set("spark.executor.memory","22g")
conf.set("spark.yarn.executor.memoryOverhead","4096")
conf.set("spark.executor.cores","4")
conf.set("spark.sql.hive.convertMetastoreParquet", "false")
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)

r = sc.parallelize(xrange(1,1))
print r.count()

sc.stop()

The problem is none of my Config settings are passed on to Yarn.

spark-submit --master yarn --deploy-mode cluster ayan_test.py

I tried the same code with deploy-mode=client and all config are passing
fine.

Am I missing something? Will introducing --property-file be of any help?
Can anybody share some working example?

Best
Ayan

-- 
Best Regards,
Ayan Guha


Re: Spark runs out of memory with small file

2017-02-26 Thread ayan guha
n [8]: rdd3.count()
>>>> Out[8]: 508310
>>>>
>>>> In [9]: rdd3.take(1)
>>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>>> ternal.warc.gz='WARC/1.0\r')]
>>>>
>>>> In [10]: def process_file(s):
>>>> ...: text = s[1]
>>>> ...: d = {}
>>>> ...: l =  text.split("\n")
>>>> ...: final = []
>>>> ...: the_id = "init"
>>>> ...: for line in l:
>>>> ...: if line[0:15] == 'WARC-Record-ID:':
>>>> ...: the_id = line[15:]
>>>> ...: d[the_id] = line
>>>>     ...: final.append(Row(**d))
>>>> ...: return final
>>>>
>>>> In [12]: rdd2 = rdd1.map(process_file)
>>>>
>>>> In [13]: rdd2.count()
>>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>>> head.
>>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>>> 10.3 GB physical memory used. Consider boosting
>>>> spark.yarn.executor.memoryOverhead.
>>>>
>>>>
>>>> --
>>>> Henry Tremblay
>>>> Robert Half Technology
>>>>
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>> --
>>> Henry Tremblay
>>> Robert Half Technology
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: CSV DStream to Hive

2017-02-21 Thread ayan guha
I am afraid your requirement is not very clear. Can you post some example
data and what output are you expecting?


On Wed, 22 Feb 2017 at 9:13 am, nimrodo <nimrod.o...@veracity-group.com>
wrote:

> Hi all,
>
> I have a DStream that contains very long comma separated values. I want to
> convert this DStream to a DataFrame. I thought of using split on the RDD
> and
> toDF however I can't get it to work.
>
> Can anyone help me here?
>
> Nimrod
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/CSV-DStream-to-Hive-tp28410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: How to query a query with not contain, not start_with, not end_with condition effective?

2017-02-21 Thread ayan guha
First thing i would do is to add distinct, both inner and outer queries
On Tue, 21 Feb 2017 at 8:56 pm, Chanh Le <giaosu...@gmail.com> wrote:

> Hi everyone,
>
> I am working on a dataset like this
> *user_id url *
> 1  lao.com/buy
> 2  bao.com/sell
> 2  cao.com/market
> 1   lao.com/sell
> 3  vui.com/sell
>
> I have to find all *user_id* with *url* not contain *sell*. Which means I
> need to query all *user_id* contains *sell* and put it into a set then do
> another query to find all *user_id* not in that set.
>
>
>
> *SELECT user_id FROM dataWHERE user_id not in ( SELECT user_id FROM data
> WHERE url like ‘%sell%’;*
> My data is about *20 million records and it’s growing*. When I tried in
> zeppelin I need to *set spark.sql.crossJoin.enabled = true*
> Then I ran the query and the driver got extremely high CPU percentage and
> the process get stuck and I need to kill it.
> I am running at client mode that submit to a Mesos cluster.
>
> I am using* Spark 2.0.2* and my data store in *HDFS* with *parquet format*
> .
>
> Any advices for me in this situation?
>
> Thank you in advance!.
>
> Regards,
> Chanh
>
-- 
Best Regards,
Ayan Guha


Re: Basic Grouping Question

2017-02-20 Thread ayan guha
Hi

Once you specify the aggregates on group By function (I am assuming you
mean dataframe here?), grouping and aggregate both works in distributed
fashion (you may want to look into how reduceByKey and/or aggregateBykey
work).

On Mon, Feb 20, 2017 at 10:23 PM, Marco Mans <ma...@telemans.de> wrote:

> Hi!
>
> I'm new to Spark and trying to write my first spark job on some data I
> have.
> The data is in this (parquet) format:
>
> Code,timestamp, value
> A, 2017-01-01, 123
> A, 2017-01-02, 124
> A, 2017-01-03, 126
> B, 2017-01-01, 127
> B, 2017-01-02, 126
> B, 2017-01-03, 123
>
> I want to write a little map-reduce application that must be run on each
> 'code'.
> So I would need to group the data on the 'code' column and than execute
> the map and the reduce steps on each code; 2 times in this example, A and B.
>
> But when I group the data (groupBy-function), it returns a
> RelationalDatasetGroup. On this I cannot apply the map and reduce function.
>
> I have the feeling that I am running in the wrong direction. Does anyone
> know how to approach this? (I hope I explained it right, so it can be
> understand :))
>
> Regards,
> Marco
>



-- 
Best Regards,
Ayan Guha


Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread ayan guha
Hi

AFAIK, Kinesis does not provide any mechanism other than check point to
restart. That makes sense as it makes it so generic.

Question: why cant you warm up your data from a data store? Say every 30
mins you run a job to aggregate your data to a data store for that hour.
When you restart the streaming app it would read from dynamo check point,
but it would also preps an initial rdd from data store?

Best
Ayan
On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <neil.v.maheshw...@gmail.com>
wrote:

> Hello,
>
> I am building a Spark streaming application that ingests data from an
> Amazon Kinesis stream. My application keeps track of the minimum price over
> a window for groups of similar tickets. When I deploy the application, I
> would like it to start processing at the start of the previous hours data.
> This will warm up the state of the application and allow us to deploy our
> application faster. For example, if I start the application at 3 PM, I
> would like to process the data retained by Kinesis from 2PM to 3PM, and
> then continue receiving data going forward. Spark Streaming’s Kinesis
> receiver, which relies on the Amazon Kinesis Client Library, seems to give
> me three options for choosing where to read from the stream:
>
>- read from the latest checkpointed sequence number in Dynamo
>- start from the oldest record in the stream (TRIM_HORIZON shard
>iterator type)
>- start from the most recent record in the stream (LATEST shard
>iterator type)
>
>
> Do you have any suggestions on how we could start our application at a
> specific timestamp or sequence number in the Kinesis stream? Some ideas I
> had were:
>
>- Create a KCL application that fetches the previous hour data and
>writes it to HDFS. We can create an RDD from that dataset and initialize
>our Spark Streaming job with it. The spark streaming job’s Kinesis receiver
>can have the same name as the initial KCL application, and use that
>applications checkpoint as the starting point. We’re writing our spark jobs
>in Python, so this would require launching the java MultiLang daemon, or
>writing that portion of the application in Java/Scala.
>- Before the Spark streaming application starts, we could fetch a
>shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>the sequence number of the first record returned by this iterator, and
>create an entry in Dynamo for our application for that sequence number. Our
>Kinesis receiver would pick up from this checkpoint. It makes me a little
>nervous that we would be faking Kinesis Client Library's protocol by
>writing a checkpoint into Dynamo
>
>
> Thanks in advance!
>
> Neil
>
-- 
Best Regards,
Ayan Guha


Re: Spark Thrift Server - Skip header when load data from local file

2017-02-14 Thread ayan guha
I doubt you can do that. Create a staging table and then insert into main
table after filtering the header
On Wed, 15 Feb 2017 at 4:01 pm, kumar r <kumarc...@gmail.com> wrote:

> Hi,
>
> I want to load data from csv file to Spark table using Spark thrift
> server. When i load, header(first line) should be ignored.
>
> I have used tblproperties("skip.header.line.count"="1") option. But its
> not working and first line also included.
>
> Below is spark sql query i have tried.
>
> create table tabname(id string,name string) row format delimited fields
> terminated by ',' tblproperties("skip.header.line.count"="1");
> load data local inpath 'tabname.csv' overwrite into table tabname;
>
> How can i achieve this? Is there any other solution or workaround.
>
-- 
Best Regards,
Ayan Guha


Re: Etl with spark

2017-02-12 Thread ayan guha
You can store the list of keys (I believe you use them in source file path,
right?) in a file, one key per line. Then you can read the file using
sc.textFile (So you will get a RDD of file paths) and then apply your
function as a map.

r = sc.textFile(list_file).map(your_function)

HTH

On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com>
wrote:

> Hey folks
>
> Really simple question here. I currently have an etl pipeline that reads
> from s3 and saves the data to an endstore
>
>
> I have to read from a list of keys in s3 but I am doing a raw extract then
> saving. Only some of the extracts have a simple transformation but overall
> the code looks the same
>
>
> I abstracted away this logic into a method that takes in an s3 path does
> the common transformations and saves to source
>
>
> But the job takes about 10 mins or so because I'm iteratively going down a
> list of keys
>
> Is it possible to asynchronously do this?
>
> FYI I'm using spark.read.json to read from s3 because it infers my schema
>
> Regards
> Sam
>



-- 
Best Regards,
Ayan Guha


Re: Remove dependence on HDFS

2017-02-12 Thread ayan guha
How about adding more NFS storage?

On Sun, 12 Feb 2017 at 8:14 pm, Sean Owen <so...@cloudera.com> wrote:

> Data has to live somewhere -- how do you not add storage but store more
> data?  Alluxio is not persistent storage, and S3 isn't on your premises.
>
> On Sun, Feb 12, 2017 at 4:29 AM Benjamin Kim <bbuil...@gmail.com> wrote:
>
> Has anyone got some advice on how to remove the reliance on HDFS for
> storing persistent data. We have an on-premise Spark cluster. It seems like
> a waste of resources to keep adding nodes because of a lack of storage
> space only. I would rather add more powerful nodes due to the lack of
> processing power at a less frequent rate, than add less powerful nodes at a
> more frequent rate just to handle the ever growing data. Can anyone point
> me in the right direction? Is Alluxio a good solution? S3? I would like to
> hear your thoughts.
>
> Cheers,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Un-exploding / denormalizing Spark SQL help

2017-02-08 Thread ayan guha
-+--+--+-+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> >
> +---++--+--+-+--+--+-+--+--+-+
> > |  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
> > 3|
> > |  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
> > null|
> >
> +---++--+--+-+--+--+-+--+--+-+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---++-+--++
> >>> |id |name|extra|data  |priority|
> >>> +---++-+--++
> >>> |1  |Fred|8|value1|1   |
> >>> |1  |Fred|8|value8|2   |
> >>> |1  |Fred|8|value5|3   |
> >>> |2  |Amy |9|value3|1   |
> >>> |2  |Amy |9|value5|2   |
> >>> +---++-+--++
> >>>
> >>> into something that looks like
> >>>
> >>>
> >>>
> +---++--+--+-+--+--+-+--+--+-+
> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
> >>> |priority3|
> >>>
> >>>
> +---++--+--+-+--+--+-+--+--+-+
> >>> |1  |Fred|8 |value1|1|8 |value8|2|8
>  |value5|3
> >>> |
> >>> |2  |Amy |9 |value3|1|9 |value5|2|null  |null
> >>> |null |
> >>>
> >>>
> +---++--+--+-+--+--+-+--+--+-+
> >>>
> >>> If I were going the other direction, I'd create a new column with an
> >>> array of structs, each with 'extra', 'data', and 'priority' fields and
> then
> >>> explode it.
> >>>
> >>> Going from the more normalized view, though, I'm having a harder time.
> >>>
> >>> I want to group or partition by (id, name) and order by priority, but
> >>> after that I can't figure out how to get multiple rows rotated into
> one.
> >>>
> >>> Any ideas?
> >>>
> >>> Here's the code to create the input table above:
> >>>
> >>> import org.apache.spark.sql.Row
> >>> import org.apache.spark.sql.Dataset
> >>> import org.apache.spark.sql.types._
> >>>
> >>> val rowsRDD = sc.parallelize(Seq(
> >>> Row(1, "Fred", 8, "value1", 1),
> >>> Row(1, "Fred", 8, "value8", 2),
> >>> Row(1, "Fred", 8, "value5", 3),
> >>> Row(2, "Amy", 9, "value3", 1),
> >>> Row(2, "Amy", 9, "value5", 2)))
> >>>
> >>> val schema = StructType(Seq(
> >>> StructField("id", IntegerType, nullable = true),
> >>> StructField("name", StringType, nullable = true),
> >>> StructField("extra", IntegerType, nullable = true),
> >>> StructField("data", StringType, nullable = true),
> >>> StructField("priority", IntegerType, nullable = true)))
> >>>
> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
> >>>
> >>>
> >>>
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
> --
Best Regards,
Ayan Guha


Re: [Spark Context]: How to add on demand jobs to an existing spark context?

2017-02-07 Thread ayan guha
I think you are loking for livy or spark  jobserver
On Wed, 8 Feb 2017 at 12:37 am, Cosmin Posteuca <cosmin.poste...@gmail.com>
wrote:

> I want to run different jobs on demand with same spark context, but i
> don't know how exactly i can do this.
>
> I try to get current context, but seems it create a new spark context(with
> new executors).
>
> I call spark-submit to add new jobs.
>
> I run code on Amazon EMR(3 instances, 4 core & 16GB ram / instance), with
> yarn as resource manager.
>
> My code:
>
> val sparkContext = SparkContext.getOrCreate()
> val content = 1 to 4
> val result = sparkContext.parallelize(content, 5)
> result.map(value => value.toString).foreach(loop)
>
> def loop(x: String): Unit = {
>for (a <- 1 to 3000) {
>
>}
> }
>
> spark-submit:
>
> spark-submit --executor-cores 1 \
>  --executor-memory 1g \
>  --driver-memory 1g \
>  --master yarn \
>  --deploy-mode cluster \
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.shuffle.service.enabled=true \
>  --conf spark.dynamicAllocation.minExecutors=1 \
>  --conf spark.dynamicAllocation.maxExecutors=3 \
>  --conf spark.dynamicAllocation.initialExecutors=3 \
>  --conf spark.executor.instances=3 \
>
> If i run twice spark-submit it create 6 executors, but i want to run all
> this jobs on same spark application.
>
> How can achieve adding jobs to an existing spark application?
>
> I don't understand why SparkContext.getOrCreate() don't get existing
> spark context.
>
>
> Thanks,
>
> Cosmin P.
>
-- 
Best Regards,
Ayan Guha


Re: specifing schema on dataframe

2017-02-06 Thread ayan guha
If I am not missing anything here, "So I know which columns are numeric and
which arent because I have a StructType and all the internal StructFields
will tell me which ones have a DataType which is numeric and which arent"
will lead to getting to a list of fields which should be numeric.

Essentially, You will create a list of numeric fields from your "should-be"
struct type. Then you will load your raw data using built-in json reader.
At this point, your data have a wrong schema. Now, you will need to correct
it. How? You will loop over the list of numeric fields (or, you can do it
directly on the struct type), and try to match the type. If you find a
mismatch, you'd add a withColumn clause to cast to the correct data type
(from your "should-be" struct).

HTH?

Best
Ayan

On Mon, Feb 6, 2017 at 8:00 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Yup sorry I should have explained myself better
>
> So I know which columns are numeric and which arent because I have a
> StructType and all the internal StructFields will tell me which ones have a
> DataType which is numeric and which arent
>
> So assuming I have a json string which has double quotes on numbers when
> it shouldnt, and I have the correct schema in a struct type
>
>
> how can I iterate over them to programatically create the new dataframe in
> the correct format
>
> do i iterate over the columns in the StructType? or iterate over the
> columns in the dataframe and try to match them with the StructType?
>
> I hope I cleared things up, What I wouldnt do for a drawing board right
> now!
>
>
> On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> UmmI think the premise is you need to "know" beforehand which columns
>> are numeric.Unless you know it, how would you apply the schema?
>>
>> On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>>> Thanks ayan but I meant how to derive the list automatically
>>>
>>> In your example you are specifying the numeric columns and I would like
>>> it to be applied to any schema if that makes sense
>>> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> SImple (pyspark) example:
>>>>
>>>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
>>>> >>> df.printSchema()
>>>> root
>>>>  |-- customerid: string (nullable = true)
>>>>  |-- foo: string (nullable = true)
>>>>
>>>> >>> numeric_field_list = ['customerid']
>>>>
>>>> >>> for k in numeric_field_list:
>>>> ... df = df.withColumn(k,df[k].cast("long"))
>>>> ...
>>>> >>> df.printSchema()
>>>> root
>>>>  |-- customerid: long (nullable = true)
>>>>  |-- foo: string (nullable = true)
>>>>
>>>>
>>>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>> wrote:
>>>>
>>>> Ok thanks Micheal!
>>>>
>>>>
>>>> Can I get an idea on where to start? Assuming I have the end schema and
>>>> the current dataframe...
>>>> How can I loop through it and create a new dataframe using the
>>>> WithColumn?
>>>>
>>>>
>>>> Am I iterating through the dataframe or the schema?
>>>>
>>>> I'm assuming it's easier to iterate through the columns in the old df.
>>>> For each column cast it correctly and generate a new df?
>>>>
>>>>
>>>> Would you recommend that?
>>>>
>>>> Regards
>>>> Sam
>>>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
>>>> wrote:
>>>>
>>>> If you already have the expected schema, and you know that all numbers
>>>> will always be formatted as strings in the input JSON, you could probably
>>>> derive this list automatically.
>>>>
>>>> Wouldn't it be simpler to just regex replace the numbers to remove the
>>>> quotes?
>>>>
>>>>
>>>> I think this is likely to be a slower and less robust solution.  You
>>>> would have to make sure that you got all the corner cases right (i.e.
>>>> escaping and what not).
>>>>
>>>> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>> wrote:
>>>>
>>>> I see so for the connect

Re: specifing schema on dataframe

2017-02-06 Thread ayan guha
UmmI think the premise is you need to "know" beforehand which columns
are numeric.Unless you know it, how would you apply the schema?

On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Thanks ayan but I meant how to derive the list automatically
>
> In your example you are specifying the numeric columns and I would like it
> to be applied to any schema if that makes sense
> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:
>
>> SImple (pyspark) example:
>>
>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
>> >>> df.printSchema()
>> root
>>  |-- customerid: string (nullable = true)
>>  |-- foo: string (nullable = true)
>>
>> >>> numeric_field_list = ['customerid']
>>
>> >>> for k in numeric_field_list:
>> ... df = df.withColumn(k,df[k].cast("long"))
>> ...
>> >>> df.printSchema()
>> root
>>  |-- customerid: long (nullable = true)
>>  |-- foo: string (nullable = true)
>>
>>
>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>> Ok thanks Micheal!
>>
>>
>> Can I get an idea on where to start? Assuming I have the end schema and
>> the current dataframe...
>> How can I loop through it and create a new dataframe using the WithColumn?
>>
>>
>> Am I iterating through the dataframe or the schema?
>>
>> I'm assuming it's easier to iterate through the columns in the old df.
>> For each column cast it correctly and generate a new df?
>>
>>
>> Would you recommend that?
>>
>> Regards
>> Sam
>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>> If you already have the expected schema, and you know that all numbers
>> will always be formatted as strings in the input JSON, you could probably
>> derive this list automatically.
>>
>> Wouldn't it be simpler to just regex replace the numbers to remove the
>> quotes?
>>
>>
>> I think this is likely to be a slower and less robust solution.  You
>> would have to make sure that you got all the corner cases right (i.e.
>> escaping and what not).
>>
>> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>> I see so for the connector I need to pass in an array/list of numerical
>> columns?
>>
>> Wouldnt it be simpler to just regex replace the numbers to remove the
>> quotes?
>>
>>
>> Regards
>> Sam
>>
>> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>> Specifying the schema when parsing JSON will only let you pick between
>> similar datatypes (i.e should this be a short, long float, double etc).  It
>> will not let you perform conversions like string <-> number.  This has to
>> be done with explicit casts after the data has been loaded.
>>
>> I think you can make a solution that uses select or withColumn generic.
>> Just load the dataframe with a "parse schema" that treats numbers as
>> strings.  Then construct a list of columns that should be numbers and apply
>> the necessary conversions.
>>
>> import org.apache.spark.sql.functions.col
>> var df = spark.read.schema(parseSchema).json("...")
>> numericColumns.foreach { columnName =>
>>   df = df.withColumn(columnName, col(columnName).cast("long"))
>> }
>>
>>
>>
>> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>> Thanks Micheal
>>
>> I've been spending the past few days researching this
>>
>> The problem is the generated json has double quotes on fields that are
>> numbers because the producing datastore doesn't want to lose precision
>>
>> I can change the data type true but that would be on specific to a job
>> rather than a generic streaming job. I'm writing a structured streaming
>> connector and I have the schema the generated dataframe should match.
>>
>> Unfortunately using withColumn won't help me here since the solution
>> needs to be generic
>>
>> To summarise assume I have the following json
>>
>> [{
>> "customerid": "535137",
>> "foo": "bar"
>> }]
>>
>>
>> and I know the schema should be:
>> StructType(Array(StructField("customerid",LongType,true),
>> StructField(&q

Re: specifing schema on dataframe

2017-02-06 Thread ayan guha
gt; .
>>
>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
>> wrote:
>>
>> Hi Direceu
>>
>> Thanks your right! that did work
>>
>>
>> But now im facing an even bigger problem since i dont have access to
>> change the underlying data, I just want to apply a schema over something
>> that was written via the sparkContext.newAPIHadoopRDD
>>
>> Basically I am reading in a RDD[JsonObject] and would like to convert it
>> into a dataframe which I pass the schema into
>>
>> Whats the best way to do this?
>>
>> I doubt removing all the quotes in the JSON is the best solution is it?
>>
>> Regards
>> Sam
>>
>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>> Hi Sam
>> Remove the " from the number that it will work
>>
>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
>> escreveu:
>>
>> Hi All
>>
>> I would like to specify a schema when reading from a json but when trying
>> to map a number to a Double it fails, I tried FloatType and IntType with no
>> joy!
>>
>>
>> When inferring the schema customer id is set to String, and I would like
>> to cast it as Double
>>
>> so df1 is corrupted while df2 shows
>>
>>
>> Also FYI I need this to be generic as I would like to apply it to any
>> json, I specified the below schema as an example of the issue I am facing
>>
>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>> DoubleType,FloatType, StructType, LongType,DecimalType}
>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>> val df1 = 
>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> val df2 = 
>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> df1.show(1)
>> df2.show(1)
>>
>>
>> Any help would be appreciated, I am sure I am missing something obvious
>> but for the life of me I cant tell what it is!
>>
>>
>> Kind Regards
>> Sam
>>
>>
>>
>>
>>
>>
>>


-- 
Best Regards,
Ayan Guha


Re: filters Pushdown

2017-02-02 Thread ayan guha
Look for spark packages website. If your questions were targeted for hive,
then i think in general all answers are yes
On Thu, 2 Feb 2017 at 9:23 pm, Peter Shmukler <pe...@varonis.com> wrote:

> Hi Vincent,
>
> Thank you for answer. (I don’t see your answer in mailing list, so I’m
> answering directly)
>
>
>
> What connectors can I work with from Spark?
>
> Can you provide any link to read about it because I see nothing in Spark
> documentation?
>
>
>
>
>
> *From:* vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
> *Sent:* Thursday, February 2, 2017 12:12 PM
> *To:* Peter Shmukler <pe...@varonis.com>
> *Cc:* user@spark.apache.org
> *Subject:* Re: filters Pushdown
>
>
>
> Pushdowns depend on the source connector.
> Join pushdown with Cassandra only
> Filter pushdown with mainly all sources with some specific constraints
>
>
>
> Le 2 févr. 2017 10:42 AM, "Peter Sg" <pe...@varonis.com> a écrit :
>
> Can community help me to figure out some details about Spark:
> -   Does Spark support filter Pushdown for types:
>   o Int/long
>   o DateTime
>   o String
> -   Does Spark support Pushdown of join operations for partitioned
> tables (in
> case of join condition includes partitioning field)?
> -   Does Spark support Pushdown on Parquet, ORC ?
>   o Should I use Hadoop or NTFS/NFS is option was well?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/filters-Pushdown-tp28357.html
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_filters-2DPushdown-2Dtp28357.html=DwMFaQ=7s4bs_giP1ngjwWhX4oayQ=kLWLAWGkyIRgjRCprqh7QX1OMFp1eBZjlRawqzDlMWc=Zss0q3yuZVzxFuqvPaXLIOHACrxzZOjevU-VE8Eeh04=dupzi0-PiyPLCmvPqwWSt2NaEE5hUKlbzmB4-NRuhfg=>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
> This email and any attachments thereto may contain private, confidential,
> and privileged material for the sole use of the intended recipient. Any
> review, copying, or distribution of this email (or any attachments thereto)
> by others is strictly prohibited. If you are not the intended recipient,
> please contact the sender immediately and permanently delete the original
> and any copies of this email and any attachments thereto.
>
-- 
Best Regards,
Ayan Guha


Re: Text

2017-01-27 Thread ayan guha
I would not count on order preserving nature of the operations, because it
is not guranteed. I would assign some order to the sentences and sort at
the end before write back

On Fri, 27 Jan 2017 at 10:59 pm, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Some operations like map, filter, flatMap and coalesce (with
> shuffle=false) usually preserve the order. However, sortBy, reduceBy,
> partitionBy, join etc. do not.
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>
> On 27 January 2017 at 09:44, Soheila S. <soheila...@gmail.com> wrote:
>
> Hi All,
> I read a test file using sparkContext.textfile(filename) and assign it to
> an RDD and process the RDD (replace some words) and finally write it to
> a text file using rdd.saveAsTextFile(output).
> Is there any way to be sure the order of the sentences will not be
> changed? I need to have the same text with some corrected words.
>
> thanks!
>
> Soheila
>
>
> --
Best Regards,
Ayan Guha


Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread ayan guha
Maybe a naive question: why are you creating 1 Dstream per shard? It should
be one Dstream corresponding to kinesis stream, isn't it?

On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> Just a guess though, Kinesis shards sometimes have skew data.
> So, before you compute something from kinesis RDDs, you'd be better to
> repartition them
> for better parallelism.
>
> // maropu
>
> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote:
>
>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera)
>> to read information from Kinesis and write it to HDFS in parquet format.
>> The write seems very slow, and if I understood Spark's diagnostics
>> correctly, always seemed to run from the same executor, one partition after
>> the other, serially. So I stripped the program down to this:
>>
>>
>> val kinesisStreams = (0 until numShards).map { i => {
>>
>>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>>
>> kinesisStreamName, kinesisUrl, awsRegion,
>> InitialPositionInStream.LATEST)
>>
>> new Duration(streamingInterval.millis),
>> StorageLevel.MEMORY_AND_DISK_SER,
>>
>> awsCredentials.accessKey, awsCredentials.secretKey)
>>
>> }}
>>
>> val allKinesisStreams = streamingContext.union(kinesisStreams)
>>
>> allKinesisStreams.foreachRDD {
>>
>>rdd => {
>>
>>   info("total for this batch is " + rdd.count())
>>
>>}
>> }
>>
>> The Kinesis stream has 20 shards (overprovisioned for this small test). I
>> confirmed using a small boto program that data is periodically written to
>> all 20 of the shards. I can see that Spark has created 20 executors, one
>> for each Kinesis shard. It also creates one other executor, tied to a
>> particular worker node, and that node seems to do the RDD counting. The
>> streaming interval is 1 minute, during which time several shards have
>> received data. Each minute interval, for this particular example, the
>> driver prints out between 20 and 30 for the count value. I expected to see
>> the count operation parallelized across the cluster. I think I must just be
>> misunderstanding something fundamental! Can anyone point out where I'm
>> going wrong?
>>
>> Yours in confusion,
>> Graham
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Best Regards,
Ayan Guha


Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
Hi

I will do a little more testing and will let you know. It did not work with
INT and Number types, for sure.

While writing, everything is fine :)

On Fri, Jan 27, 2017 at 1:04 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> How about this?
> https://github.com/apache/spark/blob/master/sql/core/
> src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala#L729
> Or, how about using Double or something instead of Numeric?
>
> // maropu
>
> On Fri, Jan 27, 2017 at 10:25 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Okay, it is working with varchar columns only. Is there any way to
>> workaround this?
>>
>> On Fri, Jan 27, 2017 at 12:22 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> hi
>>>
>>> I thought so too, so I created a table with INT and Varchar columns
>>>
>>> desc agtest1
>>>
>>> Name Null Type
>>>   -
>>> PID   NUMBER(38)
>>> DES   VARCHAR2(100)
>>>
>>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>>> table = "agtest1"
>>> user = "bal"
>>> password= "bal"
>>> driver="oracle.jdbc.OracleDriver"
>>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>>> user,"password":password,"driver":driver})
>>>
>>>
>>> Still the issue persists.
>>>
>>> On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro <
>>> linguin....@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think you got this error because you used `NUMERIC` types in your
>>>> schema (https://github.com/apache/spark/blob/master/sql/core/src/ma
>>>> in/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So, IIUC
>>>> avoiding the type is a workaround.
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I am facing exact issue with Oracle/Exadataas mentioned here
>>>>> <http://stackoverflow.com/questions/41873449/sparksql-key-not-found-scale>.
>>>>> Any idea? I could not figure out so sending to this grou hoping someone
>>>>> have see it (and solved it)
>>>>>
>>>>> Spark Version: 1.6
>>>>> pyspark command:
>>>>>
>>>>> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
>>>>> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
>>>>> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>>>> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>>>> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12
>>>>> .1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/bdc
>>>>> ell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/big
>>>>> datasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/bigda
>>>>> tasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/oracle/
>>>>> bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/oracle/bigd
>>>>> atasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/opt/oracle/
>>>>> bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/opt/oracle/
>>>>> bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
>>>>> spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracl
>>>>> e-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds
>>>>> /ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jl
>>>>> ib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib
>>>>> -bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>>> bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bd
>>>>> s/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>>>>> kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>>>>> ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>>>>> ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bd
>>>>> s/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>>>> jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/
>>>>> bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar,/opt/oracle/
>>>>> bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/
>>>>> bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/opt/
>>>

Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
Okay, it is working with varchar columns only. Is there any way to
workaround this?

On Fri, Jan 27, 2017 at 12:22 PM, ayan guha <guha.a...@gmail.com> wrote:

> hi
>
> I thought so too, so I created a table with INT and Varchar columns
>
> desc agtest1
>
> Name Null Type
>   -
> PID   NUMBER(38)
> DES   VARCHAR2(100)
>
> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
> table = "agtest1"
> user = "bal"
> password= "bal"
> driver="oracle.jdbc.OracleDriver"
> df = sqlContext.read.jdbc(url=url,table=table,properties={"user"
> :user,"password":password,"driver":driver})
>
>
> Still the issue persists.
>
> On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> I think you got this error because you used `NUMERIC` types in your
>> schema (https://github.com/apache/spark/blob/master/sql/core/src/
>> main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So, IIUC
>> avoiding the type is a workaround.
>>
>> // maropu
>>
>>
>> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am facing exact issue with Oracle/Exadataas mentioned here
>>> <http://stackoverflow.com/questions/41873449/sparksql-key-not-found-scale>.
>>> Any idea? I could not figure out so sending to this grou hoping someone
>>> have see it (and solved it)
>>>
>>> Spark Version: 1.6
>>> pyspark command:
>>>
>>> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
>>> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12
>>> .1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/
>>> bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/
>>> bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/
>>> bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/
>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/
>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/
>>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/
>>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
>>> spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracl
>>> e-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>> bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/
>>> bdcell-12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/
>>> bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.
>>> jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>>> orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>> bds/orahivedp-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/orai18n.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/orai18n-orig.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/oraloader-orig.jar
>>>
>>>
>>> Here is my code:
>>>
>>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>>> table = "HIST_FORECAST_NEXT_BILL_DGTL"
>>> user = "bal"
>>> password= "bal"
>>> driver="oracle.jdbc.OracleDriver"
>>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>>> user,"password":password,"driver":driver})
>>>
>>>
>>> Error:
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>>> park/python/pyspark/sql/readwriter.py", line 289, in jdbc
>>> return self._df(self._jreader.jdbc(url, table, jprop))
>>>   File "/opt

Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
hi

I thought so too, so I created a table with INT and Varchar columns

desc agtest1

Name Null Type
  -
PID   NUMBER(38)
DES   VARCHAR2(100)

url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
table = "agtest1"
user = "bal"
password= "bal"
driver="oracle.jdbc.OracleDriver"
df =
sqlContext.read.jdbc(url=url,table=table,properties={"user":user,"password":password,"driver":driver})


Still the issue persists.

On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> I think you got this error because you used `NUMERIC` types in your schema
> (https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So,
> IIUC avoiding the type is a workaround.
>
> // maropu
>
>
> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> I am facing exact issue with Oracle/Exadataas mentioned here
>> <http://stackoverflow.com/questions/41873449/sparksql-key-not-found-scale>.
>> Any idea? I could not figure out so sending to this grou hoping someone
>> have see it (and solved it)
>>
>> Spark Version: 1.6
>> pyspark command:
>>
>> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
>> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/
>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/
>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/
>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/
>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.
>> jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>> oraloader.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
>>   --conf spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>> oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>> jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/
>> bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigd
>> atasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/bigd
>> atasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/orai18n.jar/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/kvclient.jar,/opt/oracle/bigda
>> tasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar,/opt/oracle/
>> bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar,/
>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-
>> common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-
>> hadoop-common-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>> jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>> jlib-bds/orahivedp-orig.jar,/opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/orai18n.jar,/opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/orai18n-orig.jar,/opt/oracle/bigdatasql/
>> bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/
>> bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
>>
>>
>> Here is my code:
>>
>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>> table = "HIST_FORECAST_NEXT_BILL_DGTL"
>> user = "bal"
>> password= "bal"
>> driver="oracle.jdbc.OracleDriver"
>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>> user,"password":password,"driver":driver})
>>
>>
>> Error:
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/pyspark/sql/readwriter.py", line 289, in jdbc
>> return self._df(self._jreader.jdbc(url, table, jprop))
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in
>> __call__
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/pyspark/sql/utils.py", line 45, in deco
>> return f(*a, **kw)
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in
>> get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling

Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
Hi

I am facing exact issue with Oracle/Exadataas mentioned here
<http://stackoverflow.com/questions/41873449/sparksql-key-not-found-scale>.
Any idea? I could not figure out so sending to this grou hoping someone
have see it (and solved it)

Spark Version: 1.6
pyspark command:

pyspark --driver-class-path
/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
  --conf
spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar


Here is my code:

url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
table = "HIST_FORECAST_NEXT_BILL_DGTL"
user = "bal"
password= "bal"
driver="oracle.jdbc.OracleDriver"
df =
sqlContext.read.jdbc(url=url,table=table,properties={"user":user,"password":password,"driver":driver})


Error:
Traceback (most recent call last):
  File "", line 1, in 
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/pyspark/sql/readwriter.py",
line 289, in jdbc
return self._df(self._jreader.jdbc(url, table, jprop))
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __call__
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/pyspark/sql/utils.py",
line 45, in deco
return f(*a, **kw)
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py",
line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o40.jdbc.
: java.util.NoSuchElementException: key not found: scale
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.types.Metadata.get(Metadata.scala:108)
at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
at
org.apache.spark.sql.jdbc.OracleDialect$.getCatalystType(OracleDialect.scala:33)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:140)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
at
org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:222)
at
org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)



-- 
Best Regards,
Ayan Guha


Re: Spark SQL DataFrame to Kafka Topic

2017-01-24 Thread ayan guha
Is there a plan to have this in pyspark in dome later release?

On Wed, 25 Jan 2017 at 10:01 am, Koert Kuipers <ko...@tresata.com> wrote:

> i implemented a sink using foreach it was indeed straightforward thanks
>
> On Fri, Jan 13, 2017 at 6:30 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> Structured Streaming has a foreach sink, where you can essentially do what
> you want with your data. Its easy to create a Kafka producer, and write the
> data out to kafka.
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
>
> On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
> how do you do this with structured streaming? i see no mention of writing
> to kafka
>
> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian <mohaj...@gmail.com>
> wrote:
>
> Yes, it is called Structured Streaming:
> https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
>
> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar <senthilec...@gmail.com>
> wrote:
>
> Hi Team ,
>
>  Sorry if this question already asked in this forum..
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
> Here is my Code which Reads Parquet File :
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("/temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
> Cheers,
>
> Senthil
>
>
>
>
>
> --
Best Regards,
Ayan Guha


Re: Will be in around 12:30pm due to some personal stuff

2017-01-19 Thread ayan guha
Sure...we will wait :) :)

Just kidding

On Fri, Jan 20, 2017 at 4:48 PM, Manohar753 <manohar.re...@happiestminds.com
> wrote:

> Get Outlook for Android <https://aka.ms/ghei36>
> --
> Happiest Minds Disclaimer
>
> This message is for the sole use of the intended recipient(s) and may
> contain confidential, proprietary or legally privileged information. Any
> unauthorized review, use, disclosure or distribution is prohibited. If you
> are not the original intended recipient of the message, please contact the
> sender by reply email and destroy all copies of the original message.
> Happiest Minds Technologies <http://www.happiestminds.com>
>
> --
>
> --
> View this message in context: Will be in around 12:30pm due to some
> personal stuff
> <http://apache-spark-user-list.1001560.n3.nabble.com/Will-be-in-around-12-30pm-due-to-some-personal-stuff-tp28326.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
Best Regards,
Ayan Guha


Re: Flume and Spark Streaming

2017-01-16 Thread ayan guha
With Flume, what would be your sink?



On Mon, Jan 16, 2017 at 10:44 PM, Guillermo Ortiz <konstt2...@gmail.com>
wrote:

> I'm wondering to use Flume (channel file)-Spark Streaming.
>
> I have some doubts about it:
>
> 1.The RDD size is all data what it comes in a microbatch which you have
> defined. Risght?
>
> 2.If there are 2Gb of data, how many are RDDs generated? just one and I
> have to make a repartition?
>
> 3.When is the ACK sent back  from Spark to Flume?
>   I guess that if Flume dies, Flume is going to send the same data again
> to Spark
>   If Spark dies, I have no idea if Spark is going to reprocessing same
> data again when it is sent again.
>   Coult it be different if I use Kafka Channel?
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Old version of Spark [v1.2.0]

2017-01-15 Thread ayan guha
No WorriesI also faced the issue a while back and good people in the
community helped me:)

On Mon, Jan 16, 2017 at 9:55 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi Ayan,
>
> Thanks a million.
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>
> On 15 January 2017 at 22:48, ayan guha <guha.a...@gmail.com> wrote:
>
>> archive.apache.org will always have all the releases:
>> http://archive.apache.org/dist/spark/
>>
>> @Spark users: it may be a good idea to have a "To download older
>> versions, click here" link to Spark Download page?
>>
>> On Mon, Jan 16, 2017 at 8:16 AM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Hi,
>>>
>>> I am looking for Spark 1.2.0 version. I tried to download in the Spark
>>> website but it's no longer available.
>>>
>>> Any suggestion?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim*, BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> <http://139.59.184.114/index.html>
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Old version of Spark [v1.2.0]

2017-01-15 Thread ayan guha
archive.apache.org will always have all the releases:
http://archive.apache.org/dist/spark/

@Spark users: it may be a good idea to have a "To download older versions,
click here" link to Spark Download page?

On Mon, Jan 16, 2017 at 8:16 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi,
>
> I am looking for Spark 1.2.0 version. I tried to download in the Spark
> website but it's no longer available.
>
> Any suggestion?
>
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>



-- 
Best Regards,
Ayan Guha


Re: Can't load a RandomForestClassificationModel in Spark job

2017-01-12 Thread ayan guha
Hi

Given training and predictions are two different applications, I typically
save model objects to hdfs and load it back during prediction map stages.

Best
Ayan
On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh  wrote:

> Hi all,
> I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
>
> I encountered two frustrating issues and would really appreciate some
> advice:
>
> 1)  RandomForestClassificationModel is effectively not serializable (I
> assume it's referencing something that can't be serialized, since it itself
> extends serializable), so I ended up with the well-known exception:
> org.apache.spark.SparkException: Task not serializable.
> Basically, my original intention was to pass the model as a parameter
>
> because which model we use is dynamic based on what record we are
>
> predicting on.
>
> Has anyone else encountered this? Is this currently being addressed? I
> would expect objects from Spark's own libraries be able to be used
> seamlessly in their applications without these types of exceptions.
>
> 2) The RandomForestClassificationModel.load method appears to hang
> indefinitely when executed from inside a map function (which I assume is
> passed to the executor). So, I basically cannot load a model from a worker.
> We have multiple "profiles" that use differently trained models, which are
> accessed from within a map function to run predictions on different sets of
> data.
> The thread that is hanging has this as the latest (most pertinent) code:
>
> org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
> Looking at the code in github, it appears that it is calling sc.textFile.
> I could not find anything stating that this particular function would not
> work from within a map function.
>
> Are there any suggestions as to how I can get this model to work on a real
> production job (either by allowing it to be serializable and passed around
> or loaded from a worker)?
>
> I've extenisvely POCed this model (saving, loading, transforming,
> training, etc.), however this is the first time I'm attempting to use it
> from within a real application.
>
> Sumona
>


Re: Add row IDs column to data frame

2017-01-12 Thread ayan guha
Just in case you are more comfortable with SQL,

row_number over ()

should also generate an unique id.

On Thu, Jan 12, 2017 at 7:00 PM, akbar501 <akbar...@gmail.com> wrote:

> The following are 2 different approaches to adding an id/index to RDDs and
> 1
> approach to adding an index to a DataFrame.
>
> Add an index column to an RDD
>
>
> ```scala
> // RDD
> val dataRDD = sc.textFile("./README.md")
> // Add index then set index as key in map() transformation
> // Results in RDD[(Long, String)]
> val indexedRDD = dataRDD.zipWithIndex().map(pair => (pair._2, pair._1))
> ```
>
> Add a unique id column to an RDD
>
>
> ```scala
> // RDD
> val dataRDD = sc.textFile("./README.md")
> // Add unique id then set id as key in map() transformation
> // Results in RDD[(Long, String)]
> val indexedRDD = dataRDD.zipWithUniqueId().map(pair => (pair._2, pair._1))
> indexedRDD.collect
> ```
>
> Add an index column to a DataFrame
>
>
> Note: You could use a similar approach with a Dataset.
>
> ```scala
> import spark.implicits._
> import org.apache.spark.sql.functions.monotonicallyIncreasingId
>
> val dataDF = spark.read.textFile("./README.md")
> val indexedDF = dataDF.withColumn("id", monotonically_increasing_id)
> indexedDF.select($"id", $"value").show
> ```
>
>
>
> -
> Delixus.com - Spark Consulting
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Append-column-to-Data-Frame-or-RDD-
> tp22385p28300.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: how to change datatype by useing StructType

2017-01-11 Thread ayan guha
Do you have year in your data?
On Thu, 12 Jan 2017 at 5:24 pm, lk_spark  wrote:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> hi,all
>
>
> I have a txt file ,and I want to process it as dataframe
>
> :
>
>
>
>
>
> data like this :
>
>
>name1,30
>
>
>name2,18
>
>
>
>
>
> val schemaString = "name age year"
>
>
> val xMap=new
>
> scala.collection.mutable.HashMap[String,DataType]()
>
>
> xMap.put("name", StringType)
> xMap.put("age",
>
> IntegerType)
> xMap.put("year",
>
> IntegerType)
>
> val fields =
>
> schemaString.split(" ").map(fieldName => StructField(fieldName,
>
> xMap.get(fieldName).get, nullable = true))
> val schema =
>
> StructType(fields)
>
> val peopleRDD =
>
> spark.sparkContext.textFile("/sourcedata/test/test*")
>
>
> //spark.read.schema(schema).text("/sourcedata/test/test*")
>
>
>
> val rowRDD = peopleRDD.map(_.split(",")).map(attributes
>
> => Row(attributes(0),attributes(1))
>
>
>
>
>
> // Apply the schema to the RDD
> val
>
> peopleDF = spark.createDataFrame(rowRDD, schema)
>
>
>
>
>
> but when I write it to table or show it I will got
>
> error:
>
>
>
>
>
>
>
>
>Caused by: java.lang.RuntimeException: Error while encoding:
>
> java.lang.RuntimeException: java.lang.String is not a valid external type
> for
>
> schema of int
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top
>
> level row object).isNullAt) null else staticinvoke(class
>
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>
> org.apache.spark.sql.Row, true], top level row object), 0, name),
> StringType),
>
> true) AS name#1
> +- if (assertnotnull(input[0, org.apache.spark.sql.Row,
>
> true], top level row object).isNullAt) null else staticinvoke(class
>
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>
> org.apache.spark.sql.Row, true], top level row object), 0, name),
> StringType),
>
> true)
>
>
>
>
>
>if I change my code it will work:
>
>
>val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
>
> Row(attributes(0),attributes(1).toInt)
>
>
>but this is not a good idea .
>
>
>
>
> 2017-01-12
>
>
> --
>
>
> lk_spark
>


Re: Efficient look up in Key Pair RDD

2017-01-08 Thread ayan guha
It is a hive construct, supported since hive 0.10, so I would be very
surprised if Spark does not support itcan't speak for Spark 2.0 (not
got a chance to touch it yet :) )

On Mon, Jan 9, 2017 at 2:33 PM, Anil Langote <anillangote0...@gmail.com>
wrote:

> Does it support in Spark Dataset 2.0 ?
>
>
>
>
>
> Thank you
>
> Anil Langote
>
> +1-425-633-9747 <+1%20425-633-9747>
>
>
>
>
>
> *From: *ayan guha <guha.a...@gmail.com>
> *Date: *Sunday, January 8, 2017 at 10:32 PM
> *To: *Anil Langote <anillangote0...@gmail.com>
>
> *Subject: *Re: Efficient look up in Key Pair RDD
>
>
>
> Hi
>
>
>
> Please have a look in this wiki
> <https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation,+Cube,+Grouping+and+Rollup>.
> Grouping Set is a variation of GROUP BY where you can specify the
> combinations in one go.
>
>
>
> For Example, if you have 2 attributes, you can roll up (ATT1),
> (ATT2,ATT2), (ATT2) by specifying the groups using grouping sets.
>
>
>
> Best
>
> Ayan
>
>
>
> On Mon, Jan 9, 2017 at 2:29 PM, Anil Langote <anillangote0...@gmail.com>
> wrote:
>
> Hi Ayan
>
>
>
> Thanks a lot for reply, what is GROUPING SET? I did try GROUP BY with UDAF
> but it doesn’t perform well. for one combination it takes 1.5 mins in my
> use case I have 400 combinations which will take ~400 mins I am looking for
> a solution which will scale on the combinations.
>
>
>
> Thank you
>
> Anil Langote
>
> +1-425-633-9747 <+1%20425-633-9747>
>
>
>
>
>
> *From: *ayan guha <guha.a...@gmail.com>
> *Date: *Sunday, January 8, 2017 at 10:26 PM
> *To: *Anil Langote <anillangote0...@gmail.com>
> *Cc: *Holden Karau <hol...@pigscanfly.ca>, user <user@spark.apache.org>
> *Subject: *Re: Efficient look up in Key Pair RDD
>
>
>
> Have you tried something like GROUPING SET? That seems to be the exact
> thing you are looking for
>
>
>
> On Mon, Jan 9, 2017 at 12:37 PM, Anil Langote <anillangote0...@gmail.com>
> wrote:
>
> Sure. Let me explain you my requirement I have an input file which has
> attributes (25) and las column is array of doubles (14500 elements in
> original file)
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> Now I have to compute the addition of the double for any given combination
> for example in above file we will have below possible combinations
>
>
>
> 1.  Attribute_0, Attribute_1
>
> 2.  Attribute_0, Attribute_2
>
> 3.  Attribute_0, Attribute_3
>
> 4.  Attribute_1, Attribute_2
>
> 5.  Attribute_2, Attribute_3
>
> 6.  Attribute_1, Attribute_3
>
> 7.  Attribute_0, Attribute_1, Attribute_2
>
> 8.  Attribute_0, Attribute_1, Attribute_3
>
> 9.  Attribute_0, Attribute_2, Attribute_3
>
> 10.  Attribute_1, Attribute_2, Attribute_3
>
> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>
>
>
> now if we process the *Attribute_0, Attribute_1* combination we want
> below output. In similar way we have to process all the above combinations
>
>
>
> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319,
> 0.7698036740044117]
>
> 3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518,
> 0.6913180478781878]
>
>
>
> Solution tried
>
>
>
> I have created parequet file which will have the schema and last column
> will be array of doubles. The size of the parquet file I have is 276G which
> has 2.65 M records.
>
>
>
> I have implemented the UDAF which will have
>
>
>
> Input schema : array of doubles
>
> Buffer schema : array of doubles
>
> Return schema : array of doubles
>
>
>
> I load the data from parquet file and then register the UDAF to use with
> below query, note that SUM is UDAF
>
>
>
> SELECT COUNT(*) AS MATCHES, SU

Re: Efficient look up in Key Pair RDD

2017-01-08 Thread ayan guha
Have you tried something like GROUPING SET? That seems to be the exact
thing you are looking for

On Mon, Jan 9, 2017 at 12:37 PM, Anil Langote <anillangote0...@gmail.com>
wrote:

> Sure. Let me explain you my requirement I have an input file which has
> attributes (25) and las column is array of doubles (14500 elements in
> original file)
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> Now I have to compute the addition of the double for any given combination
> for example in above file we will have below possible combinations
>
>
>
> 1.  Attribute_0, Attribute_1
>
> 2.  Attribute_0, Attribute_2
>
> 3.  Attribute_0, Attribute_3
>
> 4.  Attribute_1, Attribute_2
>
> 5.  Attribute_2, Attribute_3
>
> 6.  Attribute_1, Attribute_3
>
> 7.  Attribute_0, Attribute_1, Attribute_2
>
> 8.  Attribute_0, Attribute_1, Attribute_3
>
> 9.  Attribute_0, Attribute_2, Attribute_3
>
> 10.  Attribute_1, Attribute_2, Attribute_3
>
> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>
>
>
> now if we process the *Attribute_0, Attribute_1* combination we want
> below output. In similar way we have to process all the above combinations
>
>
>
> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319,
> 0.7698036740044117]
>
> 3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518,
> 0.6913180478781878]
>
>
>
> Solution tried
>
>
>
> I have created parequet file which will have the schema and last column
> will be array of doubles. The size of the parquet file I have is 276G which
> has 2.65 M records.
>
>
>
> I have implemented the UDAF which will have
>
>
>
> Input schema : array of doubles
>
> Buffer schema : array of doubles
>
> Return schema : array of doubles
>
>
>
> I load the data from parquet file and then register the UDAF to use with
> below query, note that SUM is UDAF
>
>
>
> SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), *Attribute_0, Attribute_1
> FROM RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1*
>
>
>
> This works fine and it takes 1.2 mins for one combination my use case will
> have 400 combinations which means 8 hours which is not meeting the SLA we
> want this to be below 1 hours. What is the best way to implement this use
> case.
>
> Best Regards,
>
> Anil Langote
>
> +1-425-633-9747
>
> On Jan 8, 2017, at 8:17 PM, Holden Karau <hol...@pigscanfly.ca> wrote:
>
> To start with caching and having a known partioner will help a bit, then
> there is also the IndexedRDD project, but in general spark might not be the
> best tool for the job.  Have you considered having Spark output to
> something like memcache?
>
> What's the goal of you are trying to accomplish?
>
> On Sun, Jan 8, 2017 at 5:04 PM Anil Langote <anillangote0...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have a requirement where I wanted to build a distributed HashMap which
>> holds 10M key value pairs and provides very efficient lookups for each key.
>> I tried loading the file into JavaPairedRDD and tried calling lookup method
>> its very slow.
>>
>> How can I achieve very very faster lookup by a given key?
>>
>> Thank you
>> Anil Langote
>>
>


-- 
Best Regards,
Ayan Guha


Re: Approach: Incremental data load from HBASE

2017-01-06 Thread ayan guha
IMHO you should not "think" HBase in RDMBS terms, but you can use
ColumnFilters to filter out new records

On Fri, Jan 6, 2017 at 7:22 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Hi Ayan,
>
> I mean by Incremental load from HBase, weekly running batch jobs takes
> rows from HBase table and dump it out to Hive. Now when next i run Job it
> only takes newly arrived jobs.
>
> Same as if we use Sqoop for incremental load from RDBMS to Hive with below
> command,
>
> sqoop job --create myssb1 -- import --connect
> jdbc:mysql://:/sakila --username admin --password admin
> --driver=com.mysql.jdbc.Driver --query "SELECT address_id, address,
> district, city_id, postal_code, alast_update, cityid, city, country_id,
> clast_update FROM(SELECT a.address_id as address_id, a.address as address,
> a.district as district, a.city_id as city_id, a.postal_code as postal_code,
> a.last_update as alast_update, c.city_id as cityid, c.city as city,
> c.country_id as country_id, c.last_update as clast_update FROM
> sakila.address a INNER JOIN sakila.city c ON a.city_id=c.city_id) as sub
> WHERE $CONDITIONS" --incremental lastmodified --check-column alast_update
> --last-value 1900-01-01 --target-dir /user/cloudera/ssb7 --hive-import
> --hive-table test.sakila -m 1 --hive-drop-import-delims --map-column-java
> address=String
>
> Probably i am looking for any tool from HBase incubator family which does
> the job for me, or other alternative approaches can be done through reading
> Hbase tables in RDD and saving RDD to Hive.
>
> Thanks.
>
>
> On Thu, Jan 5, 2017 at 2:02 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi Chetan
>>
>> What do you mean by incremental load from HBase? There is a timestamp
>> marker for each cell, but not at Row level.
>>
>> On Wed, Jan 4, 2017 at 10:37 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Ted Yu,
>>>
>>> You understood wrong, i said Incremental load from HBase to Hive,
>>> individually you can say Incremental Import from HBase.
>>>
>>> On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Incremental load traditionally means generating hfiles and
>>>> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load
>>>> the data into hbase.
>>>>
>>>> For your use case, the producer needs to find rows where the flag is 0
>>>> or 1.
>>>> After such rows are obtained, it is up to you how the result of
>>>> processing is delivered to hbase.
>>>>
>>>> Cheers
>>>>
>>>> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Ok, Sure will ask.
>>>>>
>>>>> But what would be generic best practice solution for Incremental load
>>>>> from HBASE.
>>>>>
>>>>> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> I haven't used Gobblin.
>>>>>> You can consider asking Gobblin mailing list of the first option.
>>>>>>
>>>>>> The second option would work.
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Guys,
>>>>>>>
>>>>>>> I would like to understand different approach for Distributed
>>>>>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>>>>>> satisfy requirement ?
>>>>>>>
>>>>>>> *Approach 1:*
>>>>>>>
>>>>>>> Write Kafka Producer and maintain manually column flag for events
>>>>>>> and ingest it with Linkedin Gobblin to HDFS / S3.
>>>>>>>
>>>>>>> *Approach 2:*
>>>>>>>
>>>>>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>>>>>> maintain flag column at HBase Level.
>>>>>>>
>>>>>>> In above both approach, I need to maintain column level flags. such
>>>>>>> as 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer
>>>>>>> will take another 1000 rows of batch where flag is 0 or 1.
>>>>>>>
>>>>>>> I am looking for best practice approach with any distributed tool.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> - Chetan Khatri
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Approach: Incremental data load from HBASE

2017-01-04 Thread ayan guha
Hi Chetan

What do you mean by incremental load from HBase? There is a timestamp
marker for each cell, but not at Row level.

On Wed, Jan 4, 2017 at 10:37 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Ted Yu,
>
> You understood wrong, i said Incremental load from HBase to Hive,
> individually you can say Incremental Import from HBase.
>
> On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Incremental load traditionally means generating hfiles and
>> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load
>> the data into hbase.
>>
>> For your use case, the producer needs to find rows where the flag is 0 or
>> 1.
>> After such rows are obtained, it is up to you how the result of
>> processing is delivered to hbase.
>>
>> Cheers
>>
>> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Ok, Sure will ask.
>>>
>>> But what would be generic best practice solution for Incremental load
>>> from HBASE.
>>>
>>> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> I haven't used Gobblin.
>>>> You can consider asking Gobblin mailing list of the first option.
>>>>
>>>> The second option would work.
>>>>
>>>>
>>>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hello Guys,
>>>>>
>>>>> I would like to understand different approach for Distributed
>>>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>>>> satisfy requirement ?
>>>>>
>>>>> *Approach 1:*
>>>>>
>>>>> Write Kafka Producer and maintain manually column flag for events and
>>>>> ingest it with Linkedin Gobblin to HDFS / S3.
>>>>>
>>>>> *Approach 2:*
>>>>>
>>>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>>>> maintain flag column at HBase Level.
>>>>>
>>>>> In above both approach, I need to maintain column level flags. such as
>>>>> 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer will
>>>>> take another 1000 rows of batch where flag is 0 or 1.
>>>>>
>>>>> I am looking for best practice approach with any distributed tool.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> - Chetan Khatri
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread ayan guha
Ahh I see what you meanI confused two terminologiesbecause we were
talking about partitioning and then changed topic to identify changed data


For that, you can "construct" a dbtable as an inline view -

viewSQL = "(select * from table where  >
'')".replace("","inserted_on").replace("",checkPointedValue)
dbtable =viewSQL

refer to this
<http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/>
blog...

So, in summary, you have 2 things

1. Identify changed data - my suggestion to use dbtable with inline view
2. parallelism - use numPartition,lowerbound,upper bound to generate number
of partitions

HTH



On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:

> Hi Ayan,
>
> Yeah, I understand your proposal, but according to here
> http://spark.apache.org/docs/latest/sql-programming-
> guide.html#jdbc-to-other-databases, it says
>
> Notice that lowerBound and upperBound are just used to decide the
> partition stride, not for filtering the rows in table. So all rows in the
> table will be partitioned and returned. This option applies only to reading.
>
> So my interpretation is all rows in the table are ingested, and this
> "lowerBound" and "upperBound" is the span of each partition. Well, I am not
> a native English speaker, maybe it means differently?
>
> Best regards,
> Yang
>
> 2017-01-03 17:23 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>
>> Hi
>>
>> You need to store and capture the Max of the column you intend to use for
>> identifying new records (Ex: INSERTED_ON) after every successful run of
>> your job. Then, use the value in lowerBound option.
>>
>> Essentially, you want to create a query like
>>
>> select * from table where INSERTED_ON > lowerBound and
>> INSERTED_ON>
>> everytime you run the job
>>
>>
>>
>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>
>>> Hi Ayan,
>>>
>>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>>
>>> Concerning your suggestion for Spark, it is indeed parallelized with
>>> multiple workers, but the job is one-off and cannot keep streaming.
>>> Moreover, I cannot specify any "start row" in the job, it will always
>>> ingest the entire table. So I also cannot simulate a streaming process by
>>> starting the job in fix intervals...
>>>
>>> Best regards,
>>> Yang
>>>
>>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>
>>>> Hi
>>>>
>>>> While the solutions provided by others looks promising and I'd like to
>>>> try out few of them, our old pal sqoop already "does" the job. It has a
>>>> incremental mode where you can provide a --check-column and
>>>> --last-modified-value combination to grab the data - and yes, sqoop
>>>> essentially does it by running a MAP-only job which spawns number of
>>>> parallel map task to grab data from DB.
>>>>
>>>> In Spark, you can use sqlContext.load function for JDBC and use
>>>> partitionColumn and numPartition to define parallelism of connection.
>>>>
>>>> Best
>>>> Ayan
>>>>
>>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ayan,
>>>>>
>>>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>>>
>>>>> I think this use case can be generalized, because the data is
>>>>> immutable and append-only. We only need to find one column or timestamp to
>>>>> track the last row consumed in the previous ingestion. This pattern should
>>>>> be common when storing sensor data. If the data is mutable, then the
>>>>> solution will be surely difficult and vendor specific as you said.
>>>>>
>>>>> The workflow you proposed is very useful. The difficulty part is how
>>>>> to parallelize the ingestion task. With Spark when I have multiple workers
>>>>> working on the same job, I don't know if there is a way and how to
>>>>> dynamically change the row range each worker should process in realtime...
>>>>>
>>>>> I tried to find out if there is any candidate available out of the
>>>>> box, instead of reinventing the wheel. At this moment I have not 
>>>>> discovered

Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread ayan guha
Hi

You need to store and capture the Max of the column you intend to use for
identifying new records (Ex: INSERTED_ON) after every successful run of
your job. Then, use the value in lowerBound option.

Essentially, you want to create a query like

select * from table where INSERTED_ON > lowerBound and
INSERTED_ON wrote:

> Hi Ayan,
>
> Thanks a lot for your suggestion. I am currently looking into sqoop.
>
> Concerning your suggestion for Spark, it is indeed parallelized with
> multiple workers, but the job is one-off and cannot keep streaming.
> Moreover, I cannot specify any "start row" in the job, it will always
> ingest the entire table. So I also cannot simulate a streaming process by
> starting the job in fix intervals...
>
> Best regards,
> Yang
>
> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>
>> Hi
>>
>> While the solutions provided by others looks promising and I'd like to
>> try out few of them, our old pal sqoop already "does" the job. It has a
>> incremental mode where you can provide a --check-column and
>> --last-modified-value combination to grab the data - and yes, sqoop
>> essentially does it by running a MAP-only job which spawns number of
>> parallel map task to grab data from DB.
>>
>> In Spark, you can use sqlContext.load function for JDBC and use
>> partitionColumn and numPartition to define parallelism of connection.
>>
>> Best
>> Ayan
>>
>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:
>>
>>> Hi Ayan,
>>>
>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>
>>> I think this use case can be generalized, because the data is immutable
>>> and append-only. We only need to find one column or timestamp to track the
>>> last row consumed in the previous ingestion. This pattern should be common
>>> when storing sensor data. If the data is mutable, then the solution will be
>>> surely difficult and vendor specific as you said.
>>>
>>> The workflow you proposed is very useful. The difficulty part is how to
>>> parallelize the ingestion task. With Spark when I have multiple workers
>>> working on the same job, I don't know if there is a way and how to
>>> dynamically change the row range each worker should process in realtime...
>>>
>>> I tried to find out if there is any candidate available out of the box,
>>> instead of reinventing the wheel. At this moment I have not discovered any
>>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>>> proper candidate from your knowledge?
>>>
>>> Thank you again and have a nice day.
>>>
>>> Best regards,
>>> Yang
>>>
>>>
>>>
>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>>>
>>>>
>>>>
>>>> "If data ingestion speed is faster than data production speed, then
>>>> eventually the entire database will be harvested and those workers will
>>>> start to "tail" the database for new data streams and the processing
>>>> becomes real time."
>>>>
>>>> This part is really database dependent. So it will be hard to
>>>> generalize it. For example, say you have a batch interval of 10
>>>> secswhat happens if you get more than one updates on the same row
>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different
>>>> databases provide different mechanisms to expose all DML changes, MySQL has
>>>> binlogs, oracle has log shipping, cdc,golden gate and so ontypically it
>>>> requires new product or new licenses and most likely new component
>>>> installation on production db :)
>>>>
>>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>>> solution can be achieved fairly easily by
>>>>
>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>>> 3. Running an extraction/load mechanism which will take data from DB
>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>>>> ETL tools would too...
>>>> 4. Finally, update check point...
>>>>
>>>> You m

Re: Broadcast Join and Inner Join giving different result on same DataFrame

2017-01-03 Thread ayan guha
I think productBroadcastDF is broadcast variable in your case, not the DF
itself. Try the join with productBroadcastDF.value

On Wed, Jan 4, 2017 at 1:04 AM, Patrick <titlibat...@gmail.com> wrote:

> Hi,
>
> An Update on above question: In Local[*] mode code is working fine. The
> Broadcast size is 200MB, but on Yarn it the broadcast join is giving empty
> result.But in Sql Query in UI, it does show BroadcastHint.
>
> Thanks
>
>
> On Fri, Dec 30, 2016 at 9:15 PM, titli batali <titlibat...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have two dataframes which has common column Product_Id on which i have
>> to perform a join operation.
>>
>> val transactionDF = readCSVToDataFrame(sqlCtx: SQLContext,
>> pathToReadTransactions: String, transactionSchema: StructType)
>> val productDF = readCSVToDataFrame(sqlCtx: SQLContext,
>> pathToReadProduct:String, productSchema: StructType)
>>
>> As, transaction data is very large but product data is small, i would
>> ideally do a  broadcast join where i braodcast productDF.
>>
>>  val productBroadcastDF =  broadcast(productDF)
>>  val broadcastJoin = transcationDF.join(productBroadcastDF,
>> "productId")
>>
>> Or simply,  val innerJoin = transcationDF.join(productDF, "productId")
>> should give the same result as above.
>>
>> But If i join using simple inner join i get  dataframe  with joined
>> values whereas if i do broadcast join i get empty dataframe with empty
>> values. I am not able to explain this behavior. Ideally both should give
>> the same result.
>>
>> What could have gone wrong. Any one faced the similar issue?
>>
>>
>> Thanks,
>> Prateek
>>
>>
>>
>>
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-03 Thread ayan guha
Hi

While the solutions provided by others looks promising and I'd like to try
out few of them, our old pal sqoop already "does" the job. It has a
incremental mode where you can provide a --check-column and
--last-modified-value combination to grab the data - and yes, sqoop
essentially does it by running a MAP-only job which spawns number of
parallel map task to grab data from DB.

In Spark, you can use sqlContext.load function for JDBC and use
partitionColumn and numPartition to define parallelism of connection.

Best
Ayan

On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1...@gmail.com> wrote:

> Hi Ayan,
>
> Thanks a lot for such a detailed response. I really appreciate it!
>
> I think this use case can be generalized, because the data is immutable
> and append-only. We only need to find one column or timestamp to track the
> last row consumed in the previous ingestion. This pattern should be common
> when storing sensor data. If the data is mutable, then the solution will be
> surely difficult and vendor specific as you said.
>
> The workflow you proposed is very useful. The difficulty part is how to
> parallelize the ingestion task. With Spark when I have multiple workers
> working on the same job, I don't know if there is a way and how to
> dynamically change the row range each worker should process in realtime...
>
> I tried to find out if there is any candidate available out of the box,
> instead of reinventing the wheel. At this moment I have not discovered any
> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
> proper candidate from your knowledge?
>
> Thank you again and have a nice day.
>
> Best regards,
> Yang
>
>
>
> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.a...@gmail.com>:
>
>>
>>
>> "If data ingestion speed is faster than data production speed, then
>> eventually the entire database will be harvested and those workers will
>> start to "tail" the database for new data streams and the processing
>> becomes real time."
>>
>> This part is really database dependent. So it will be hard to generalize
>> it. For example, say you have a batch interval of 10 secswhat happens
>> if you get more than one updates on the same row within 10 secs? You will
>> get a snapshot of every 10 secs. Now, different databases provide different
>> mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
>> shipping, cdc,golden gate and so ontypically it requires new product or
>> new licenses and most likely new component installation on production db :)
>>
>> So, if we keep real CDC solutions out of scope, a simple snapshot
>> solution can be achieved fairly easily by
>>
>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>> 3. Running an extraction/load mechanism which will take data from DB
>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can
>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
>> ETL tools would too...
>> 4. Finally, update check point...
>>
>> You may "determine" checkpoint from the data you already have in HDFS if
>> you create a Hive structure on it.
>>
>> Best
>> AYan
>>
>>
>>
>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd@gmail.com> wrote:
>>
>>> why not sync binlog of mysql(hopefully the data is immutable and the
>>> table is append-only), send the log through kafka and then consume it by
>>> spark streaming?
>>>
>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>>
>>>> In the mean time you could try implementing your own Source, but that
>>>> is pretty low level and is not yet a stable API.
>>>>
>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>> yyz1...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>
>>>>> I don't want to waste your time, so before I write to you, I googled,
>>>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>>>> and "jdbc&q

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread ayan guha
ote:
>
> Hi Apache Spark User team,
>
>
>
> Greetings!
>
> I started developing an application using Apache Hadoop and Spark using
> python. My pyspark application randomly terminated saying "Failed to get
> broadcast_1*" and I have been searching for suggestion and support in
> Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in
> pyspark application
> <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application>
>
>
> Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
> I was building an application on Apache Spark 2.00 with Python 3.4 and
> trying to load some CSV files from HDFS (...
>
> <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application>
>
>
> Could you please provide suggestion registering myself in Apache User list
> or how can I get suggestion or support to debug the problem I am facing?
>
> Your response will be highly appreciated.
>
>
>
> Thanks & Best Regards,
> Engr. Palash Gupta
> WhatsApp/Viber: +8801817181502
> Skype: palash2494
>
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2016-12-29 Thread ayan guha
"If data ingestion speed is faster than data production speed, then
eventually the entire database will be harvested and those workers will
start to "tail" the database for new data streams and the processing
becomes real time."

This part is really database dependent. So it will be hard to generalize
it. For example, say you have a batch interval of 10 secswhat happens
if you get more than one updates on the same row within 10 secs? You will
get a snapshot of every 10 secs. Now, different databases provide different
mechanisms to expose all DML changes, MySQL has binlogs, oracle has log
shipping, cdc,golden gate and so ontypically it requires new product or
new licenses and most likely new component installation on production db :)

So, if we keep real CDC solutions out of scope, a simple snapshot solution
can be achieved fairly easily by

1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
3. Running an extraction/load mechanism which will take data from DB (where
INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS. This can be
sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you can
directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of the
ETL tools would too...
4. Finally, update check point...

You may "determine" checkpoint from the data you already have in HDFS if
you create a Hive structure on it.

Best
AYan



On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd@gmail.com> wrote:

> why not sync binlog of mysql(hopefully the data is immutable and the table
> is append-only), send the log through kafka and then consume it by spark
> streaming?
>
> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> We don't support this yet, but I've opened this JIRA as it sounds
>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>
>> In the mean time you could try implementing your own Source, but that is
>> pretty low level and is not yet a stable API.
>>
>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <yyz1...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks a lot for your contributions to bring us new technologies.
>>>
>>> I don't want to waste your time, so before I write to you, I googled,
>>> checked stackoverflow and mailing list archive with keywords "streaming"
>>> and "jdbc". But I was not able to get any solution to my use case. I hope I
>>> can get some clarification from you.
>>>
>>> The use case is quite straightforward, I need to harvest a relational
>>> database via jdbc, do something with data, and store result into Kafka. I
>>> am stuck at the first step, and the difficulty is as follows:
>>>
>>> 1. The database is too large to ingest with one thread.
>>> 2. The database is dynamic and time series data comes in constantly.
>>>
>>> Then an ideal workflow is that multiple workers process partitions of
>>> data incrementally according to a time window. For example, the processing
>>> starts from the earliest data with each batch containing data for one hour.
>>> If data ingestion speed is faster than data production speed, then
>>> eventually the entire database will be harvested and those workers will
>>> start to "tail" the database for new data streams and the processing
>>> becomes real time.
>>>
>>> With Spark SQL I can ingest data from a JDBC source with partitions
>>> divided by time windows, but how can I dynamically increment the time
>>> windows during execution? Assume that there are two workers ingesting data
>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker gets next task
>>> for 2017-01-03. But I am not able to find out how to increment those values
>>> during execution.
>>>
>>> Then I looked into Structured Streaming. It looks much more promising
>>> because window operations based on event time are considered during
>>> streaming, which could be the solution to my use case. However, from
>>> documentation and code example I did not find anything related to streaming
>>> data from a growing database. Is there anything I can read to achieve my
>>> goal?
>>>
>>> Any suggestion is highly appreciated. Thank you very much and have a
>>> nice day.
>>>
>>> Best regards,
>>> Yang
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Best way to process lookup ETL with Dataframes

2016-12-29 Thread ayan guha
How about this -

select a.*, nvl(b.col,nvl(c.col,'some default'))
from driving_table a
left outer join lookup1 b on a.id=b.id
left outer join lookup2 c on a.id=c,id

?

On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike <msesterh...@cars.com>
wrote:

> Hi all,
>
>
> I'm writing an ETL process with Spark 1.5, and I was wondering the best
> way to do something.
>
>
> A lot of the fields I am processing require an algorithm similar to this:
>
>
> Join input dataframe to a lookup table.
>
> if (that lookup fails (the joined fields are null)) {
>
> Lookup into some other table to join some other fields.
>
> }
>
>
> With Dataframes, it seems the only way to do this is to do something like
> this:
>
>
> Join input dataframe to a lookup table.
>
> if (that lookup fails (the joined fields are null)) {
>
>*SPLIT the dataframe into two DFs via DataFrame.filter(),
>
>   one group with successful lookup, the other failed).*
>
>For failed lookup:  {
>
>Lookup into some other table to grab some other fields.
>
>}
>
>*MERGE the dataframe splits back together via DataFrame.unionAll().*
> }
>
>
> I'm seeing some really large execution plans as you might imagine in the
> Spark Ui, and the processing time seems way out of proportion with the size
> of the dataset.  (~250GB in 9 hours).
>
>
> Is this the best approach to implement an algorithm like this?  Note also
> that some fields I am implementing require multiple staged split/merge
> steps due to cascading lookup joins.
>
>
> Thanks,
>
>
> *Michael Sesterhenn*
>
>
> *msesterh...@cars.com <msesterh...@cars.com> *
>
>


-- 
Best Regards,
Ayan Guha


Merri Christmas and Happy New Year

2016-12-25 Thread ayan guha
Merri Christmas to all spark users.

May your new year will be more "structured" with "streaming" success:)


Re: Has anyone managed to connect to Oracle via JDBC from Spark CDH 5.5.2

2016-12-21 Thread ayan guha
Try providing correct driver name through property variable in the jdbc
call.
On Thu., 22 Dec. 2016 at 8:40 am, Mich Talebzadeh 
wrote:

> This works with Spark 2 with Oracle jar file added to
>
>
>
>
>
> $SPARK_HOME/conf/ spark -defaults.conf
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> spark.driver.extraClassPath
>
> /home/hduser/jars/ojdbc6.jar
>
>
>
> spark.executor.extraClassPath
>
> /home/hduser/jars/ojdbc6.jar
>
>
>
>
>
>
>
>
>
> and you get
>
>
>
>
>
>  cala> val s = HiveContext.read.format("jdbc").options(
>  | Map("url" -> _ORACLEserver,
>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>  | "partitionColumn" -> "ID",
>  | "lowerBound" -> "1",
>  | "upperBound" -> "1",
>  | "numPartitions" -> "10",
>  | "user" -> _username,
>  | "password" -> _password)).load
> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
> more fields]
>
>
>
>
> that works.
> However, with CDH 5.5.2 (Spark 1.5) it fails with error
>
>
>
>
>
>
>
> *java.sql.SQLException:No suitable driver*
>
>
>
>
>
>
> at java.sql.DriverManager.getDriver(DriverManager.java:315)
>
>
>
>
>
>
> at
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>
>
>
>
>
> at scala.Option.getOrElse(Option.scala:121)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:53)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123)
>
>
>
>
>
>
> at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:117)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:53)
>
>
>
>
>
>
> at
>
>
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:315)
>
>
>
>
>
>
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>
>
>
>
>
>
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>
>
>
>
>
> Any ideas?
>
> Thanks
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
>
>
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
>
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction
>
> of data or any other property which may arise from relying on this
> email's technical content is explicitly disclaimed.
>
> The author will in no case be liable for any monetary damages arising from
> such
>
> loss, damage or destruction.
>
>
>
>
>
>
>
>
>
>
>


Re: How to perform Join operation using JAVARDD

2016-12-19 Thread ayan guha
What's your desired output?
On Sat., 17 Dec. 2016 at 9:50 pm, Sree Eedupuganti  wrote:

> I tried like this,
>
> *CrashData_1.csv:*
>
> *CRASH_KEYCRASH_NUMBER  CRASH_DATECRASH_MONTH*
> *2016899114 2016899114  01/02/2016   12:00:00
> AM +*
>
> *CrashData_2.csv:*
>
> *CITY_NAMEZIPCODE CITY STATE*
> *1945 704   PARC PARQUE   PR*
>
>
> Code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *JavaRDD firstRDD =
> sc.textFile("/Users/apple/Desktop/CrashData_1.csv");*
>
> *JavaRDD secondRDD =
> sc.textFile("/Users/apple/Desktop/CrashData_2.csv");*
>
> *JavaRDD allRDD = firstRDD.union(secondRDD);*
>
>
> *Output i am getting:*
>
> *[CRASH_KEY,CRASH_NUMBER,CRASH_DATE,CRASH_MONTH,
> 2016899114,2016899114,01/02/2016 12:00:00 AM + *
>
> *CITY_NAME,ZIPCODE,CITY,STATE, **1945,704,PARC PARQUE,PR]*
>
>
>
>
> *Any suggesttions please, Thanks in advance*
>
>
>


Re: How to get recent value in spark dataframe

2016-12-19 Thread ayan guha
You have 2 parts to it

1. Do a sub query where for each primary key derive latest value of flag=1
records. Ensure you get exactly 1 record per primary key value. Here you
can use rank() over (partition by primary key order by year desc)

2. Join your original dataset with the above on primary key. If year is
higher than latest flag=1 record then take it else mark it null

If you can have primary keys which may have no flag=1 records then they
wouldn't show up in set 1 above. So if you still want them in result then
adjust 1 accordingly.

Best
Ayan
On Mon., 19 Dec. 2016 at 1:01 pm, Richard Xin
 wrote:

> I am not sure I understood your logic, but it seems to me that you could
> take a look of Hive's Lead/Lag functions.
>
>
> On Monday, December 19, 2016 1:41 AM, Milin korath <
> milin.kor...@impelsys.com> wrote:
>
>
> thanks, I tried with left outer join. My dataset having around 400M
> records and lot of shuffling is happening.Is there any other workaround
> apart from Join,I tried use window function but I am not getting a proper
> solution,
>
>
> Thanks
>
> On Sat, Dec 17, 2016 at 4:55 AM, Michael Armbrust 
> wrote:
>
> Oh and to get the null for missing years, you'd need to do an outer join
> with a table containing all of the years you are interested in.
>
> On Fri, Dec 16, 2016 at 3:24 PM, Michael Armbrust 
> wrote:
>
> Are you looking for argmax? Here is an example
> 
> .
>
> On Wed, Dec 14, 2016 at 8:49 PM, Milin korath 
> wrote:
>
> Hi
>
> I have a spark data frame with following structure
>
>  id  flag price date
>
>   a   0100  2015
>
>   a   050   2015
>
>   a   1200  2014
>
>   a   1300  2013
>
>   a   0400  2012
>
> I need to create a data frame with recent value of flag 1 and updated in
> the flag 0 rows.
>
>   id  flag price date new_column
>
>   a   0100  2015200
>
>   a   050   2015200
>
>   a   1200  2014null
>
>   a   1300  2013null
>
>   a   0400  2012null
>
> We have 2 rows having flag=0. Consider the first row(flag=0),I will have 2
> values(200 and 300) and I am taking the recent one 200(2014). And the last
> row I don't have any recent value for flag 1 so it is updated with null.
> Looking for a solution using scala. Any help would be appreciated.Thanks
>
> Thanks
> Milin
>
>
>
>
>
>
>
>
>
>
>
>


Re: Wrting data from Spark streaming to AWS Redshift?

2016-12-09 Thread ayan guha
Ideally, saving data to external sources should not be any different. give
the write options as stated in the bloga shot, but changing mode to append.

On Sat, Dec 10, 2016 at 8:25 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Hello all,
>
> Is it possible to Write data from Spark streaming to AWS Redshift?
>
> I came across the following article, so looks like it works from a Spark
> batch program.
>
> https://databricks.com/blog/2015/10/19/introducing-
> redshift-data-source-for-spark.html
>
> I want to write to AWS Redshift from Spark Stream. Please share your
> experience and reference docs.
>
> Thanks
>



-- 
Best Regards,
Ayan Guha


[no subject]

2016-12-06 Thread ayan guha
Hi

We are generating some big model objects


> hdfs dfs -du -h /myfolder
325  975  /myfolder/__ORCHMETA__
1.7 M5.0 M/myfolder/model
185.3 K  555.9 K  /myfolder/predict

The issue I am facing while loading is

Error in .jcall("com/oracle/obx/df/OBXSerializer", returnSig =
"Ljava/lang/Object;",  :
  java.io.StreamCorruptedException: invalid type code: 00
Calls: orch.load.model ->  -> .jcall -> .jcheck -> .Call
Execution halted


As you can guess, it is from Oracle's R for Hadoop (ORAAH) product. I am
going to raise a ticket with Oracle, but thought to check here as well if
there are any solution available.



-- 
Best Regards,
Ayan Guha


Re: Access multiple cluster

2016-12-04 Thread ayan guha
Thank you guys. I will try JDBC route if I get access and let you know.

On Mon, Dec 5, 2016 at 5:17 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> If you do it frequently then you may simply copy the data to the
> processing cluster. Alternatively, you could create an external table in
> the processing cluster to the analytics cluster. However, this has to be
> supported by appropriate security configuration and might be less an
> efficient then copying the data.
>
> On 4 Dec 2016, at 22:45, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> Is it possible to access hive tables sitting on multiple clusters in a
> single spark application?
>
> We have a data processing cluster and analytics cluster. I want to join a
> table from analytics cluster with another table in processing cluster and
> finally write back in analytics cluster.
>
> Best
> Ayan
>
>


-- 
Best Regards,
Ayan Guha


Access multiple cluster

2016-12-04 Thread ayan guha
Hi

Is it possible to access hive tables sitting on multiple clusters in a
single spark application?

We have a data processing cluster and analytics cluster. I want to join a
table from analytics cluster with another table in processing cluster and
finally write back in analytics cluster.

Best
Ayan


Re: Spark sql generated dynamically

2016-12-02 Thread ayan guha
You are looking for window functions.
On 2 Dec 2016 22:33, "Georg Heiler"  wrote:

> Hi,
>
> how can I perform a group wise operation in spark more elegant? Possibly
> dynamically generate SQL? Or would you suggest a custom UADF?
> http://stackoverflow.com/q/40930003/2587904
>
> Kind regards,
> Georg
>


Re: [structured streaming] How to remove outdated data when use Window Operations

2016-12-01 Thread ayan guha
Thanks TD. Will it be available in pyspark too?
On 1 Dec 2016 19:55, "Tathagata Das"  wrote:

> In the meantime, if you are interested, you can read the design doc in the
> corresponding JIRA - https://issues.apache.org/jira/browse/SPARK-18124
>
> On Thu, Dec 1, 2016 at 12:53 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> That feature is coming in 2.1.0. We have added watermarking, that will
>> track the event time of the data and accordingly close old windows, output
>> its corresponding aggregate and then drop its corresponding state. But in
>> that case, you will have to use append mode, and aggregated data of a
>> particular window will be evicted only when the windows is closed. You will
>> be able to control the threshold on how long to wait for late, out-of-order
>> data before closing a window.
>>
>> We will be updated the docs soon to explain this.
>>
>> On Tue, Nov 29, 2016 at 8:30 PM, Xinyu Zhang  wrote:
>>
>>> Hi
>>>
>>> I want to use window operations. However, if i don't remove any data,
>>> the "complete" table will become larger and larger as time goes on. So I
>>> want to remove some outdated data in the complete table that I would never
>>> use.
>>> Is there any method to meet my requirement?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: Spark Job not exited and shows running

2016-11-30 Thread ayan guha
Can you add sc.stop at the end of the code and try?
On 1 Dec 2016 18:03, "Daniel van der Ende" 
wrote:

> Hi,
>
> I've seen this a few times too. Usually it indicates that your driver
> doesn't have enough resources to process the result. Sometimes increasing
> driver memory is enough (yarn memory overhead can also help). Is there any
> specific reason for you to run in client mode and not in cluster mode?
> Having run into this a number of times (and wanting to spare the resources
> of our submitting machines) we have now switched to use yarn cluster mode
> by default. This seems to resolve the problem.
>
> Hope this helps,
>
> Daniel
>
> On 29 Nov 2016 11:20 p.m., "Selvam Raman"  wrote:
>
>> Hi,
>>
>> I have submitted spark job in yarn client mode. The executor and cores
>> were dynamically allocated. In the job i have 20 partitions, so 5 container
>> each with 4 core has been submitted. It almost processed all the records
>> but it never exit the job and in the application master container i am
>> seeing the below error message.
>>
>>  INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
>>  WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
>>
>>
>>
>> ​The same job i ran it for only 1000 records which successfully finished.
>> ​
>>
>> Can anyone help me to sort out this issue.
>>
>> Spark version:2.0( AWS EMR).
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


Re: time to run Spark SQL query

2016-11-28 Thread ayan guha
They should take same time if everything else is constant
On 28 Nov 2016 23:41, "Hitesh Goyal"  wrote:

> Hi team, I am using spark SQL for accessing the amazon S3 bucket data.
>
> If I run a sql query by using normal SQL syntax like below
>
> 1)  DataFrame d=sqlContext.sql(i.e. Select * from tablename where
> column_condition);
>
>
>
> Secondly, if I use dataframe functions for the same query like below :-
>
> 2)  dataframe.select(column_name).where(column_condition);
>
>
>
> Now there is a question arising in my mind that which query would take
> more time to execute if I run both on the same dataset.
>
> Or both would execute in the same time duration. Please suggest your
> answer.
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9996588220
>
>
>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread ayan guha
Hi

RACK_LOCAL = Task running on the same rack but not on the same node where
data is
NODE_LOCAL = task and data is co-located. Probably you were looking for
this one?

GZIP - Read is through GZIP codec, but because it is non-splittable, so you
can have atmost 1 task reading a gzip file. Now, the content of gzip may be
across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie
4 blocks). Assume not all 4 blocks are on same data node.

When you start reading the gzip file, 1 task will be assigned. It will read
local blocks if available, and it will read remote blocks (streaming read).
While reading the stream, gzip codec will uncompress the data.

This is really is not a spark thing, but a hadoop input format
discussion

HTH?

On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar <yeshwant...@gmail.com>
wrote:

> Hi Ayan,
>
> we have  default rack topology.
>
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Tue, Nov 22, 2016 at 6:37 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Because snappy is not splittable, so single task makes sense.
>>
>> Are sure about rack topology? Ie 225 is in a different rack than 227 or
>> 228? What does your topology file says?
>> On 22 Nov 2016 10:14, "yeshwanth kumar" <yeshwant...@gmail.com> wrote:
>>
>>> Thanks for your reply,
>>>
>>> i can definitely change the underlying compression format.
>>> but i am trying to understand the Locality Level,
>>> why executor ran on a different node, where the blocks are not present,
>>> when Locality Level is RACK_LOCAL
>>>
>>> can you shed some light on this.
>>>
>>>
>>> Thanks,
>>> Yesh
>>>
>>>
>>> -Yeshwanth
>>> Can you Imagine what I would do if I could do all I can - Art of War
>>>
>>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Use as a format orc, parquet or avro because they support any
>>>> compression type with parallel processing. Alternatively split your file in
>>>> several smaller ones. Another alternative would be bzip2 (but slower in
>>>> general) or Lzo (usually it is not included by default in many
>>>> distributions).
>>>>
>>>> On 21 Nov 2016, at 23:17, yeshwanth kumar <yeshwant...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> we are running Hive on Spark, we have an external table over snappy
>>>> compressed csv file of size 917.4 M
>>>> HDFS block size is set to 256 MB
>>>>
>>>> as per my Understanding, if i run a query over that external table , it
>>>> should launch 4 tasks. one for each block.
>>>> but i am seeing one executor and one task processing all the file.
>>>>
>>>> trying to understand the reason behind,
>>>>
>>>> i went one step further to understand the block locality
>>>> when i get the block locations for that file, i found
>>>>
>>>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>>>> 4a8f-be48-b0953fdaad37,DISK],
>>>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>>>> 4eb8-8183-8d8ff5f24115,DISK],
>>>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>>>> 43f8-91c9-d8517e68414a,DISK]]
>>>>
>>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>>>> 845-b043-8b91ae4017c0,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>>>> 89b-8209-4307f3296211,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>>>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>>>
>>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>>>> 601-8070-f6c5da840e09,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>>>> 94d-87ee-bcfff2182a96,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>>>> 8d3-b858-a023b5c44e9c,DISK]
>>>>
>>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>>>> 98c-a487-5ce6aaa66f48,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>>>> e20-a360-e7cdad5dacc3,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>>>> c8f-8a13-7be37ce769c9,DISK]]
>>>>
>>>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
>>>> task
>>>>
>>>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>>>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>>>> computation
>>>> but the executor is running in 10.11.0.225
>>>>
>>>> my theory is not applying anywhere.
>>>>
>>>> please help me in understanding how spark/yarn calculates number of
>>>> executors/tasks.
>>>>
>>>> Thanks,
>>>> -Yeshwanth
>>>>
>>>>
>>>
>


-- 
Best Regards,
Ayan Guha


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread ayan guha
Because snappy is not splittable, so single task makes sense.

Are sure about rack topology? Ie 225 is in a different rack than 227 or
228? What does your topology file says?
On 22 Nov 2016 10:14, "yeshwanth kumar"  wrote:

> Thanks for your reply,
>
> i can definitely change the underlying compression format.
> but i am trying to understand the Locality Level,
> why executor ran on a different node, where the blocks are not present,
> when Locality Level is RACK_LOCAL
>
> can you shed some light on this.
>
>
> Thanks,
> Yesh
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke  wrote:
>
>> Use as a format orc, parquet or avro because they support any compression
>> type with parallel processing. Alternatively split your file in several
>> smaller ones. Another alternative would be bzip2 (but slower in general) or
>> Lzo (usually it is not included by default in many distributions).
>>
>> On 21 Nov 2016, at 23:17, yeshwanth kumar  wrote:
>>
>> Hi,
>>
>> we are running Hive on Spark, we have an external table over snappy
>> compressed csv file of size 917.4 M
>> HDFS block size is set to 256 MB
>>
>> as per my Understanding, if i run a query over that external table , it
>> should launch 4 tasks. one for each block.
>> but i am seeing one executor and one task processing all the file.
>>
>> trying to understand the reason behind,
>>
>> i went one step further to understand the block locality
>> when i get the block locations for that file, i found
>>
>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>> 4a8f-be48-b0953fdaad37,DISK],
>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>> 4eb8-8183-8d8ff5f24115,DISK],
>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>> 43f8-91c9-d8517e68414a,DISK]]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>> 845-b043-8b91ae4017c0,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>> 89b-8209-4307f3296211,DISK],
>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>> 601-8070-f6c5da840e09,DISK],
>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>> 94d-87ee-bcfff2182a96,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>> 8d3-b858-a023b5c44e9c,DISK]
>>
>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>> 98c-a487-5ce6aaa66f48,DISK],
>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>> e20-a360-e7cdad5dacc3,DISK],
>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>> c8f-8a13-7be37ce769c9,DISK]]
>>
>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>>
>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>> computation
>> but the executor is running in 10.11.0.225
>>
>> my theory is not applying anywhere.
>>
>> please help me in understanding how spark/yarn calculates number of
>> executors/tasks.
>>
>> Thanks,
>> -Yeshwanth
>>
>>
>


Re: dataframe data visualization

2016-11-20 Thread ayan guha
Zeppelin with Spark thrift server?

On Mon, Nov 21, 2016 at 1:47 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> You might take a look at this project (https://github.com/vegas-viz/Vegas),
> it has Spark integration.
>
> Thanks
> Saisai
>
> On Mon, Nov 21, 2016 at 10:23 AM, wenli.o...@alibaba-inc.com <
> wenli.o...@alibaba-inc.com> wrote:
>
>> Hi anyone,
>>
>> is there any easy way for me to do data visualization in spark using
>> scala when data is in dataframe format? Thanks.
>>
>> Wayne Ouyang
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Flume integration

2016-11-20 Thread ayan guha
Hi

While I am following this discussion with interest, I am trying to
comprehend any architectural benefit of a spark sink.

Is there any feature in flume makes it more suitable to ingest stream data
than sppark streaming, so that we should chain them? For example does it
help durability or reliability of the source?

Or, it is a more tactical choice based on connector availability or such?

To me, flume is important component to ingest streams to hdfs or hive
directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:

> Hi Ian,
>
> Has this been resolved?
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
>> Hi,
>>
>>
>>
>> I'm currently trying to implement a prototype Spark application that gets
>> data from Flume and processes it. I'm using the pull based method mentioned
>> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>>
>>
>>
>> The is initially working fine for getting data from Flume, however the
>> Spark client doesn't appear to be letting Flume know that the data has been
>> received, so Flume doesn't remove it from the batch.
>>
>>
>>
>> After 100 requests Flume stops allowing any new data and logs
>>
>>
>>
>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>> Error while processing transaction.
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.
>> doTake(MemoryChannel.java:96)
>>
>>
>>
>> My code to pull the data from Flume is
>>
>>
>>
>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>
>> Duration batchInterval = new Duration(1);
>>
>> final String checkpointDir = "/tmp/";
>>
>>
>>
>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> batchInterval);
>>
>> ssc.checkpoint(checkpointDir);
>>
>> JavaReceiverInputDStream flumeStream =
>> FlumeUtils.createPollingStream(ssc, host, port);
>>
>>
>>
>> // Transform each flume avro event to a process-able format
>>
>> JavaDStream transformedEvents = flumeStream.map(new
>> Function() {
>>
>>
>>
>> @Override
>>
>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>
>> String flumeEventStr = flumeEvent.event().toString();
>>
>> avroData avroData = new avroData();
>>
>> Gson gson = new GsonBuilder().create();
>>
>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>
>> HashMap body = avroData.getBody();
>>
>> String data = body.get("bytes");
>>
>> return data;
>>
>> }
>>
>> });
>>
>>
>>
>> ...
>>
>>
>>
>> ssc.start();
>>
>> ssc.awaitTermination();
>>
>> ssc.close();
>>
>> }
>>
>>
>>
>> Is there something specific I should be doing to let the Flume server
>> know the batch has been received and processed?
>>
>>
>> --
>>
>> Ian Brooks
>>
>>
>>
>
>


Re: using StreamingKMeans

2016-11-19 Thread ayan guha
Here are 2 concerns I would have with the design (This discussion is mostly
to validate my own understanding)

1. if you have outliers "before" running k-means, aren't your centroids get
skewed? In other word, outliers by themselves may bias the cluster
evaluation, isn't it?
2. Typically microbatches are small, like 3 sec in your case. in this
window you may not have enough data to run any statistically sigficant
operation, can you?

My approach would have been: Run K-means on data without outliers (in batch
mode). Determine the model, ie centroids in case of kmeans. Then load the
model in your streaming app and just apply "outlier detection" function,
which takes the form of

def detectOutlier(model,data):
  /// your code, like mean distance etc
  return T or F

In response to your point about "alternet set of data", I would assume you
would accumulate the data you are receiving from streaming over few weeks
or months before running offline training.

Am I missing something?

On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Looking for alternative suggestions in case where we have 1 continuous
> stream of data. Offline training and online prediction can be one option if
> we can have an alternate set of data to train. But if it's one single
> stream you don't have separate sets for training or cross validation.
>
> So whatever data u get in each micro batch, train on them and u get the
> cluster centroids from the model. Then apply some heuristics like mean
> distance from centroid and detect outliers. So for every microbatch u get
> the outliers based on the model and u can control forgetfulness of the
> model through the decay factor that u specify for StramingKMeans.
>
> Suggestions ?
>
> regards.
>
> On Sun, 20 Nov 2016 at 3:51 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Curious why do you want to train your models every 3 secs?
>> On 20 Nov 2016 06:25, "Debasish Ghosh" <ghosh.debas...@gmail.com> wrote:
>>
>> Thanks a lot for the response.
>>
>> Regarding the sampling part - yeah that's what I need to do if there's no
>> way of titrating the number of clusters online.
>>
>> I am using something like
>>
>> dstream.foreachRDD { rdd =>
>>   if (rdd.count() > 0) { //.. logic
>>   }
>> }
>>
>> Feels a little odd but if that's the idiom then I will stick to it.
>>
>> regards.
>>
>>
>>
>> On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>> So I haven't played around with streaming k means at all, but given
>> that no one responded to your message a couple of days ago, I'll say
>> what I can.
>>
>> 1. Can you not sample out some % of the stream for training?
>> 2. Can you run multiple streams at the same time with different values
>> for k and compare their performance?
>> 3. foreachRDD is fine in general, can't speak to the specifics.
>> 4. If you haven't done any transformations yet on a direct stream,
>> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
>> is very cheap, it's done on the driver only because the beginning and
>> ending offsets are known.  So you should be able to skip empty
>> batches.
>>
>>
>>
>> On Sat, Nov 19, 2016 at 10:46 AM, debasishg <ghosh.debas...@gmail.com>
>> wrote:
>> > Hello -
>> >
>> > I am trying to implement an outlier detection application on streaming
>> data.
>> > I am a newbie to Spark and hence would like some advice on the
>> confusions
>> > that I have ..
>> >
>> > I am thinking of using StreamingKMeans - is this a good choice ? I have
>> one
>> > stream of data and I need an online algorithm. But here are some
>> questions
>> > that immediately come to my mind ..
>> >
>> > 1. I cannot do separate training, cross validation etc. Is this a good
>> idea
>> > to do training and prediction online ?
>> >
>> > 2. The data will be read from the stream coming from Kafka in
>> microbatches
>> > of (say) 3 seconds. I get a DStream on which I train and get the
>> clusters.
>> > How can I decide on the number of clusters ? Using StreamingKMeans is
>> there
>> > any way I can iterate on microbatches with different values of k to
>> find the
>> > optimal one ?
>> >
>> > 3. Even if I fix k, after training on every microbatch I get a DStream.
>> How
>> > can I compute things like clustering score on the DStream ?
>> > StreamingKMeansModel has a com

Re: using StreamingKMeans

2016-11-19 Thread ayan guha
Curious why do you want to train your models every 3 secs?
On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:

> Thanks a lot for the response.
>
> Regarding the sampling part - yeah that's what I need to do if there's no
> way of titrating the number of clusters online.
>
> I am using something like
>
> dstream.foreachRDD { rdd =>
>   if (rdd.count() > 0) { //.. logic
>   }
> }
>
> Feels a little odd but if that's the idiom then I will stick to it.
>
> regards.
>
>
>
> On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger 
> wrote:
>
>> So I haven't played around with streaming k means at all, but given
>> that no one responded to your message a couple of days ago, I'll say
>> what I can.
>>
>> 1. Can you not sample out some % of the stream for training?
>> 2. Can you run multiple streams at the same time with different values
>> for k and compare their performance?
>> 3. foreachRDD is fine in general, can't speak to the specifics.
>> 4. If you haven't done any transformations yet on a direct stream,
>> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
>> is very cheap, it's done on the driver only because the beginning and
>> ending offsets are known.  So you should be able to skip empty
>> batches.
>>
>>
>>
>> On Sat, Nov 19, 2016 at 10:46 AM, debasishg 
>> wrote:
>> > Hello -
>> >
>> > I am trying to implement an outlier detection application on streaming
>> data.
>> > I am a newbie to Spark and hence would like some advice on the
>> confusions
>> > that I have ..
>> >
>> > I am thinking of using StreamingKMeans - is this a good choice ? I have
>> one
>> > stream of data and I need an online algorithm. But here are some
>> questions
>> > that immediately come to my mind ..
>> >
>> > 1. I cannot do separate training, cross validation etc. Is this a good
>> idea
>> > to do training and prediction online ?
>> >
>> > 2. The data will be read from the stream coming from Kafka in
>> microbatches
>> > of (say) 3 seconds. I get a DStream on which I train and get the
>> clusters.
>> > How can I decide on the number of clusters ? Using StreamingKMeans is
>> there
>> > any way I can iterate on microbatches with different values of k to
>> find the
>> > optimal one ?
>> >
>> > 3. Even if I fix k, after training on every microbatch I get a DStream.
>> How
>> > can I compute things like clustering score on the DStream ?
>> > StreamingKMeansModel has a computeCost function but it takes an RDD. I
>> can
>> > use dstream.foreachRDD { // process RDD for the micro batch here } - is
>> this
>> > the idiomatic way ?
>> >
>> > 4. If I use dstream.foreachRDD { .. } and use functions like new
>> > StandardScaler().fit(rdd) to do feature normalization, then it works
>> when I
>> > have data in the stream. But when the microbatch is empty (say I don't
>> have
>> > data for some time), the fit method throws exception as it gets an empty
>> > collection. Things start working ok when data starts coming back to the
>> > stream. But is this the way to go ?
>> >
>> > any suggestion will be welcome ..
>> >
>> > regards.
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread ayan guha
Hi

I think you can use map reduce paradigm here. Create a key  using user ID
and date and record as a value. Then you can express your operation (do
something) part as a function. If the function meets certain criteria such
as associative and cumulative like, say Add or multiplication, you can use
reducebykey, else you may use groupbykey.

HTH
On 18 Nov 2016 06:45, "titli batali"  wrote:

>
> That would help but again in a particular partitions i would need to a
> iterate over the customers having first n letters of user id in that
> partition. I want to get rid of nested iterations.
>
> Thanks
>
> On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan  wrote:
>
>> You can partitioned on the first n letters of userid
>>
>> On 17 November 2016 at 08:25, titli batali  wrote:
>>
>>> Hi,
>>>
>>> I have a use case, where we have 1000 csv files with a column user_Id,
>>> having 8 million unique users. The data contains: userid,date,transaction,
>>> where we run some queries.
>>>
>>> We have a case where we need to iterate for each transaction in a
>>> particular date for each user. There is three nesting loops
>>>
>>> for(user){
>>>   for(date){
>>> for(transactions){
>>>   //Do Something
>>>   }
>>>}
>>> }
>>>
>>> i.e we do similar thing for every (date,transaction) tuple for a
>>> particular user. In order to get away with loop structure and decrease the
>>> processing time We are converting converting the csv files to parquet and
>>> partioning it with userid, df.write.format("parquet").par
>>> titionBy("useridcol").save("hdfs://path").
>>>
>>> So that while reading the parquet files, we read a particular user in a
>>> particular partition and create a Cartesian product of (date X transaction)
>>> and work on the tuple in each partition, to achieve the above level of
>>> nesting. Partitioning on 8 million users is it a bad option. What could be
>>> a better way to achieve this?
>>>
>>> Thanks
>>>
>>>
>>>
>>
>>
>


Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread ayan guha
There is an utility called dos2unix. You can give it a try

On 18 Nov 2016 00:20, "Jörn Franke"  wrote:
>
> You can do the conversion of character set (is this the issue?) as part
of your loading process in Spark.
> As far as i know the spark CSV package is based on Hadoop
TextFileInputformat. This format to my best of knowledge supports only
utf-8. So you have to do a conversion from windows to utf-8. If you refer
to language specific settings (numbers, dates etc) - this is also not
supported.
>
> I started to work on the hadoopoffice library (which you can use with
Spark) where you can read Excel files directly (
https://github.com/ZuInnoTe/hadoopoffice).However, there is no official
release - yet. There you can specify also the language in which you want to
represent data values, numbers etc. when reading the file.
>
> On 17 Nov 2016, at 14:11, Mich Talebzadeh 
wrote:
>
>> Hi,
>>
>> In the past with Databricks package for csv files on occasions I had to
do some cleaning at Linux directory level before ingesting CSV file into
HDFS staging directory for Spark to read it.
>>
>> I have a more generic issue that may have to be ready.
>>
>> Assume that a provides using FTP to push CSV files into Windows
directories. The whole solution is built around windows and .NET.
>>
>> Now you want to ingest those files into HDFS and process them with Spark
CSV.
>>
>> One can create NFS directories visible to Windows server and HDFS
as well. However, there may be issues with character sets etc. What are the
best ways of handling this? One way would be to use some scripts to make
these spreadsheet time files compatible with Linux and then load them into
HDFS. For example I know that if I saved a Excel spresheet file with DOS
FORMAT, that file will work OK with Spark CSV.  Are there tools to do this
as well?
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
>>
>>


Re: use case reading files split per id

2016-11-14 Thread ayan guha
How about following approach -

- get the list of ID
- get one rdd each for them using wholetextfile
- map and flatmap to generate pair rdd with ID as key and list as value
- union all the RDD s together
- group by key
On 15 Nov 2016 16:43, "Mo Tao"  wrote:

> Hi ruben,
>
> You may try sc.binaryFiles which is designed for lots of small files and it
> can map paths into inputstreams.
> Each inputstream will keep only the path and some configuration, so it
> would
> be cheap to shuffle them.
> However, I'm not sure whether spark take the data locality into account
> while dealing with these inputstreams.
>
> Hope this helps
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/use-case-reading-files-split-per-
> id-tp28044p28075.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Grouping Set

2016-11-14 Thread ayan guha
And, run the same SQL in hive and post any difference.
On 15 Nov 2016 07:48, "ayan guha" <guha.a...@gmail.com> wrote:

> It should be A,yes. Can you please reproduce this with small data and
> exact SQL?
> On 15 Nov 2016 02:21, "Andrés Ivaldi" <iaiva...@gmail.com> wrote:
>
>> Hello, I'm tryin to use Grouping Set, but I dont know if it is a bug or
>> the correct behavior.
>>
>> Givven the above example
>> Select a,b,sum(c) from table group by a,b grouping set ( (a), (a,b) )
>>
>> What shound be the expected result
>> A:
>>
>> A  | B| sum(c)
>> xx | null | 
>> xx | yy   | 
>> xx | zz   | 
>>
>>
>> B
>> A   | B| sum(c)
>> xx  | null | 
>> xx  | yy   | 
>> xx  | zz   | 
>> null| yy   | 
>> null| zz   | 
>> null| null | 
>>
>>
>> I believe is A, but i'm getting B
>> thanks
>>
>> --
>> Ing. Ivaldi Andres
>>
>


Re: Grouping Set

2016-11-14 Thread ayan guha
It should be A,yes. Can you please reproduce this with small data and exact
SQL?
On 15 Nov 2016 02:21, "Andrés Ivaldi"  wrote:

> Hello, I'm tryin to use Grouping Set, but I dont know if it is a bug or
> the correct behavior.
>
> Givven the above example
> Select a,b,sum(c) from table group by a,b grouping set ( (a), (a,b) )
>
> What shound be the expected result
> A:
>
> A  | B| sum(c)
> xx | null | 
> xx | yy   | 
> xx | zz   | 
>
>
> B
> A   | B| sum(c)
> xx  | null | 
> xx  | yy   | 
> xx  | zz   | 
> null| yy   | 
> null| zz   | 
> null| null | 
>
>
> I believe is A, but i'm getting B
> thanks
>
> --
> Ing. Ivaldi Andres
>


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread ayan guha
Have you tried rdd.distinc?

On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Can you come up with a minimal reproducible example?
>
> Probably unrelated, but why are you doing a union of 3 streams?
>
> On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote:
> > There are no failures or errors.  Irrespective of that I am seeing
> > duplicates. The steps and stages are all successful and even the
> speculation
> > is turned off .
> >
> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Are you certain you aren't getting any failed tasks or other errors?
> >> Output actions like foreach aren't exactly once and will be retried on
> >> failures.
> >>
> >>
> >> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
> >>>
> >>> Dear fellow Spark Users,
> >>>
> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> >>> listens to Campaigns based on live stock feeds and the batch duration
> is 5
> >>> seconds. The applications uses Kafka DirectStream and based on the feed
> >>> source there are three streams. As given in the code snippet I am
> doing a
> >>> union of three streams and I am trying to remove the duplicate
> campaigns
> >>> received using reduceByKey based on the customer and campaignId. I
> could see
> >>> lot of duplicate email being send out for the same key in the same
> batch.I
> >>> was expecting reduceByKey to remove the duplicate campaigns in a batch
> based
> >>> on customer and campaignId. In logs I am even printing the the
> key,batch
> >>> time before sending the email and I could clearly see duplicates. I
> could
> >>> see some duplicates getting removed after adding log in reduceByKey
> >>> Function, but its not eliminating completely .
> >>>
> >>> JavaDStream matchedCampaigns =
> >>> stream1.transform(CmpManager::getMatchedCampaigns)
> >>> .union(stream2).union(stream3).cache();
> >>>
> >>> JavaPairDStream<String, Campaign> uniqueCampaigns =
> >>> matchedCampaigns.mapToPair(campaign->{
> >>> String key=campaign.getCustomer()+"_"+campaign.getId();
> >>> return new Tuple2<String, Campaigns>(key, campaign);
> >>> })
> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
> >>>
> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
> >>>
> >>> I am not able to figure out where I am going wrong here . Please help
> me
> >>> here to get rid of this weird problem. Previously we were using
> createStream
> >>> for listening to Kafka Queue (number of partitions 1) , there we
> didn't face
> >>> this issue. But when we moved to directStream (number of partitions
> 100) we
> >>> could easily reproduce this issue on high load .
> >>>
> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
> >>> instead of reduceByKey Operation, But even that didn't
> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
> Durations.Seconds(5),
> >>> Durations.Seconds(5))
> >>>
> >>> I have even requested for help on Stackoverflow , But I haven't
> received
> >>> any solutions to this issue.
> >>>
> >>> Stack Overflow Link
> >>> 
> >>>
> >>> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
> >>>
> >>>
> >>> Thanks and Regards
> >>> Dev
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Exception not failing Python applications (in yarn client mode) - SparkLauncher says app succeeded, where app actually has failed

2016-11-12 Thread ayan guha
It's a known issue. There was one jira which was marked resolved but I
still faced it in 1.6.
On 12 Nov 2016 14:33, "Elkhan Dadashov"  wrote:

> Hi,
>
> *Problem*:
> Spark job fails, but RM page says the job succeeded, also
>
> appHandle = sparkLauncher.startApplication()
> ...
>
> appHandle.getState() returns Finished state - which indicates The
> application finished with a successful status, whereas the Spark job
> actually failed.
>
> *Environment*: Macintosh (El Capitan), Hadoop 2.7.2, Spark 2.0,
> SparkLauncher 2.0.1
>
> I have Spark job (pagerank.py) running in yarn-client mode.
>
> *Reason of job failure*: The job fails because dependency package
> pagerank.zip is missing.
>
> *Related Jira (which indicate that bug is fixed)*:
> https://issues.apache.org/jira/browse/SPARK-7736 - this was in
> Yarn-cluster mode, now i face this issue in yarn-client mode.
> https://issues.apache.org/jira/browse/SPARK-9416 (duplicate)
>
> I faced same issue last year with SparkLauncher (spark-launcher_2.11)
> 1.4.0 version, then Marcelo had pull request which fixed the issue, and it
> was working at that time (after Marcelo's fix) for yarn-cluster mode.
>
> *Description*:
> I'm launching Spark job via SparkLauncher#startApplication(),
> 1) in the RM page, it says the job succeeded, even though the Spark job
> has failed.
> 2) in the container logs, i see that appHandle.getState() returned
> Finished state - which also means The application finished with a
> successful status.
>
> But in the same map container log lines I see that *the job is actually
> failed (*I launched Spark job from the map task*)*:
>
> 493 INFO: ImportError: ('No module named pagerank',  0x10703f500>, ('pagerank',))
> 557 INFO: ImportError: ('No module named pagerank',  0x10703f500>, ('pagerank',))
> 591 INFO: ImportError: ('No module named pagerank',  0x10c8a9500>, ('pagerank',))
> 655 INFO: ImportError: ('No module named pagerank',  0x10c8a9500>, ('pagerank',))
> 659 INFO: 16/11/11 18:25:37 ERROR TaskSetManager: Task 0 in stage 0.0
> failed 4 times; aborting job
> 665 INFO: 16/11/11 18:25:37 INFO DAGScheduler: ShuffleMapStage 0 (distinct
> at /private/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/
> nm-local-dir/usercache//appcache/application_1478901028064_
> 0016/container_1478901028064_0016_01_02/pag erank.py:52) failed in
> 3.221 s
> 667 INFO: 16/11/11 18:25:37 INFO DAGScheduler: *Job 0 failed*: collect at
> /private/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/
> nm-local-dir/usercache//appcache/application_1478901028064_
> 0016/container_1478901028064_0016_01_02/pagerank. py:68, took
> 3.303328 s
> 681 INFO: py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> 683 INFO: : org.apache.spark.SparkException: *Job aborted due to stage
> failure*: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost
> task 0.3 in stage 0.0 (TID 3, ): 
> org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
> 705 INFO: ImportError: ('No module named pagerank',  0x10c8a9500>, ('pagerank',))
> 745 INFO: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1450)
> 757 INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 759 INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> 763 INFO: at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:811)
> 841 INFO: ImportError: ('No module named pagerank',  0x10c8a9500>, ('pagerank',))
>
> 887 INFO: Spark job with app id: application_1478901028064_0017, *State
> changed to: FINISHED* - The application finished with a successful status.
>
> And here are the log lines from the Spark job container:
> 16/11/11 18:25:37 ERROR Executor: Exception in task 0.2 in stage 0.0 (TID
> 2)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
> File "/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/
> nm-local-dir/usercache//appcache/application_1478901028064_
> 0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/worker.py",
> line 161, in main
> func, profiler, deserializer, serializer = read_command(pickleSer, infile)
> File "/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/
> nm-local-dir/usercache//appcache/application_1478901028064_
> 0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/worker.py",
> line 54, in read_command
> command = serializer._read_with_length(file)
> File "/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/
> nm-local-dir/usercache//appcache/application_1478901028064_
> 0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
> return self.loads(obj)
> File 

Re: DataSet is not able to handle 50,000 columns to sum

2016-11-11 Thread ayan guha
You can explore grouping sets in SQL and write an aggregate function to add
array wise sum.

It will boil down to something like

Select attr1,attr2...,yourAgg(Val)
>From t
Group by attr1,attr2...
Grouping sets((attr1,attr2),(aytr1))
On 12 Nov 2016 04:57, "Anil Langote"  wrote:

> Hi All,
>
>
>
> I have been working on one use case and couldn’t able to think the better
> solution, I have seen you very active on spark user list please throw your
> thoughts on implementation. Below is the requirement.
>
>
>
> I have tried using dataset by splitting the double array column but it
> fails when double size grows. When I create the double array schema data
> type spark doesn’t allow me to sum them because it would be done only on
> numeric types. If I think about storing the file per combination wise to
> parquet there will be too much parquet files.
>
>
>
> *Input : * The input file will be like below in real data the attributes
> will be *20 *& the double array would be *50,000*
>
>
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> Now below are the possible combinations in above data set this will be all
> possible combinations
>
>
>
> 1.  Attribute_0, Attribute_1
>
> 2.  Attribute_0, Attribute_2
>
> 3.  Attribute_0, Attribute_3
>
> 4.  Attribute_1, Attribute_2
>
> 5.  Attribute_2, Attribute_3
>
> 6.  Attribute_1, Attribute_3
>
> 7.  Attribute_0, Attribute_1, Attribute_2
>
> 8.  Attribute_0, Attribute_1, Attribute_3
>
> 9.  Attribute_0, Attribute_2, Attribute_3
>
> 10.  Attribute_1, Attribute_2, Attribute_3
>
> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>
>
>
> Now we have to process all these combinations on input data preferably
> parallel to get good performance.
>
>
>
> *Attribute_0, Attribute_1*
>
>
>
> In this iteration the other attributes (*Attribute_2, Attribute_3*) are
> not required all we need is Attribute_0, Attribute_1 & double array
> columns. If you see the data there are two possible combination in the data
> one is 5_3 and other one is 3_2 we have to pick only those which has at
> least 2 combinations in real data we will get in thousands.
>
>
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> when we do the groupBy on above dataset with columns *Attribute_0,
> Attribute_1 *we will get two records with keys 5_3 & 3_2 and each key
> will have two double arrays.
>
>
>
> 5_3 ==> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454 & 0.8803063581705307  0.8101324740101096
> 0.48523937757683544  0.5897714618376072
>
>
>
> 3_2 ==> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575 & 0.33960064683141955  0.46537001358164043
> 0.543428826489435  0.42653939565053034
>
>
>
> now we have to add these double arrays index wise and produce the one array
>
>
>
> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319,
> 0.7698036740044117]
>
> 3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518,
> 0.6913180478781878]
>
>
>
> After adding we have to compute average, min, max etc on these vector and
> store the results against the keys.
>
>
>
> Same process will be repeated for next combinations.
>
>
>
>
>
>
>
> Thank you
>
> Anil Langote
>
> +1-425-633-9747
>
>
>


Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-09 Thread ayan guha
Yes, it can be done and a standard practice. I would suggest a mixed
approach: use Informatica to create files in hdfs and have hive staging
tables as external tables on those directories. Then that point onwards use
spark.

Hth
Ayan
On 10 Nov 2016 04:00, "Mich Talebzadeh"  wrote:

> Thanks Mike for insight.
>
> This is a request landed on us which is rather unusual.
>
> As I understand Informatica is an ETL tool. Most of these are glorified
> Sqoop with GUI where you define your source and target.
>
> In a normal day Informatica takes data out of an RDBMS like Oracle table
> and lands it on Teradata or Sybase IQ (DW).
>
> So in our case we really need to redefine the map. Customer does not want
> the plug in from the Informatica for Hive etc which admittedly will make
> life far easier. They want us to come up with a solution.
>
> In the absence of the fact that we cannot use JDBC for Hive etc as target
> (?), the easiest option is to dump it into landing zone and then do
> whatever we want with it.
>
> Also I am not sure we can use Flume for it? That was a thought in my mind.
>
> So sort of stuck between Hard and Rock here. So in short we want a plug in
> to be consumer of Informatica.
>
> cheers
>
> Mich
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 9 November 2016 at 16:14, Michael Segel 
> wrote:
>
>> Mich,
>>
>> You could do that. But really?
>>
>> Putting on my solutions architect hat…
>>
>> You or your client is spending $$$ for product licensing and you’re not
>> really using the product to its fullest.
>>
>> Yes, you can use Informatica to pull data from the source systems and
>> provide some data cleansing and transformations before you drop it on your
>> landing zone.
>>
>> If you’re going to bypass Hive, then you have to capture the schema,
>> including data types.  You’re also going to have to manage schema evolution
>> as they change over time. (I believe the ETL tools will do this for you or
>> help in the process.)
>>
>> But if you’re already working on the consumption process for ingestion on
>> your own… what is the value that you derive from using Informatica?  Is the
>> unloading and ingestion process that difficult that you can’t write that as
>> well?
>>
>> My point is that if you’re going to use the tool, use it as the vendor
>> recommends (and they may offer options…) or skip it.
>>
>> I mean heck… you may want to take the flat files (CSV, etc) that are
>> dropped in the landing zone, and then ingest and spit out parquet files via
>> spark. You just need to know the Schema(s) of ingestion and output if they
>> are not the same. ;-)
>>
>> Of course you may decide that using Informatica to pull and transform the
>> data and drop it on to the landing zone provides enough value to justify
>> its expense.  ;-) YMMV
>>
>> Just my $0.02 worth.
>>
>> Take it with a grain of Kosher Sea Salt.  (The grains are larger and the
>> salt taste’s better) ;-)
>>
>> -Mike
>>
>> On Nov 9, 2016, at 7:56 AM, Mich Talebzadeh 
>> wrote:
>>
>> Hi,
>>
>> I am exploring the idea of flexibility with importing multiple RDBMS
>> tables using Informatica that customer has into HDFS.
>>
>> I don't want to use connectivity tools from Informatica to Hive etc.
>>
>> So this is what I have in mind
>>
>>
>>1. If possible get the tables data out using Informatica and use
>>Informatica ui  to convert RDBMS data into some form of CSV, TSV file (Can
>>Informatica do it?) I guess yes
>>2. Put the flat files on an edge where HDFS node can see them.
>>3. Assuming that a directory can be created by Informatica daily,
>>periodically run a cron that ingest that data from directories into HDFS
>>equivalent daily directories
>>4. Once the data is in HDFS one can use, Spark csv, Hive etc to query
>>data
>>
>> The problem I have is to see if someone has done such thing before.
>> Specifically can Informatica create target flat files on normal directories.
>>
>> Any other generic alternative?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or 

Re: Newbie question - Best way to bootstrap with Spark

2016-11-06 Thread ayan guha
I would start with Spark documentation, really. Then you would probably
start with some older videos from youtube, especially spark summit
2014,2015 and 2016 videos. Regading practice, I would strongly suggest
Databricks cloud (or download prebuilt from spark site). You can also take
courses from EDX/Berkley, which are very good starter courses.

On Mon, Nov 7, 2016 at 11:57 AM, raghav <raghavas...@gmail.com> wrote:

> I am newbie in the world of big data analytics, and I want to teach myself
> Apache Spark, and want to be able to write scripts to tinker with data.
>
> I have some understanding of Map Reduce but have not had a chance to get my
> hands dirty. There are tons of resources for Spark, but I am looking for
> some guidance for starter material, or videos.
>
> Thanks.
>
> Raghav
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Newbie-question-Best-way-to-
> bootstrap-with-Spark-tp28032.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread ayan guha
I would go for partition by option. It seems simple and yes, SQL inspired
:)
On 4 Nov 2016 00:59, "Rabin Banerjee"  wrote:

> Hi Koert & Robin ,
>
> *  Thanks ! *But if you go through the blog https://bzhangusc.
> wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/ and
> check the comments under the blog it's actually working, although I am not
> sure how . And yes I agree a custom aggregate UDAF is a good option .
>
> Can anyone share the best way to implement this in Spark .?
>
> Regards,
> Rabin Banerjee
>
> On Thu, Nov 3, 2016 at 6:59 PM, Koert Kuipers  wrote:
>
>> Just realized you only want to keep first element. You can do this
>> without sorting by doing something similar to min or max operation using a
>> custom aggregator/udaf or reduceGroups on Dataset. This is also more
>> efficient.
>>
>> On Nov 3, 2016 7:53 AM, "Rabin Banerjee" 
>> wrote:
>>
>>> Hi All ,
>>>
>>>   I want to do a dataframe operation to find the rows having the latest
>>> timestamp in each group using the below operation
>>>
>>> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
>>> .select("customername","service_type","mobileno","cust_addr")
>>>
>>>
>>> *Spark Version :: 1.6.x*
>>>
>>> My Question is *"Will Spark guarantee the Order while doing the groupBy , 
>>> if DF is ordered using OrderBy previously in Spark 1.6.x"??*
>>>
>>>
>>> *I referred a blog here :: 
>>> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>>>  
>>> *
>>>
>>> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>>>
>>>
>>> *I need a bit elaboration of how internally spark handles it ? also is it 
>>> more efficient than using a Window function ?*
>>>
>>>
>>> *Thanks in Advance ,*
>>>
>>> *Rabin Banerjee*
>>>
>>>
>>>
>>>
>


Re: Spark ML - Is IDF model reusable

2016-11-01 Thread ayan guha
Yes, that is correct. I think I misread a part of it in terms of
scoringI think we both are saying same thing so thats a good thing :)

On Wed, Nov 2, 2016 at 10:04 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi Ayan,
>
> "classification algorithm will for sure need to Fit against new dataset
> to produce new model" I said this in context of re-training the model. Is
> it not correct? Isn't it part of re-training?
>
> Thanks
>
> On Tue, Nov 1, 2016 at 4:01 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> "classification algorithm will for sure need to Fit against new dataset
>> to produce new model" - I do not think this is correct. Maybe we are
>> talking semantics but AFAIU, you "train" one model using some dataset, and
>> then use it for scoring new datasets.
>>
>> You may re-train every month, yes. And you may run cross validation once
>> a month (after re-training) or lower freq like once in 2-3 months to
>> validate model quality. Here, number of months are not important, but you
>> must be running cross validation and similar sort of "model evaluation"
>> work flow typically in lower frequency than Re-Training process.
>>
>> On Wed, Nov 2, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> Hi Ayan,
>>> After deployment, we might re-train it every month. That is whole
>>> different problem I have explored yet. classification algorithm will for
>>> sure need to Fit against new dataset to produce new model. Correct me if I
>>> am wrong but I think I will also FIt new IDF model based on new dataset. At
>>> that time as well I will follow same training-validation split (or
>>> corss-validation) to evaluate model performance on new data before
>>> releasing it to make prediction. So afik , every time you  need to re-train
>>> model you will need to corss validate using some data split strategy.
>>>
>>> I think spark ML document should start explaining mathematical model or
>>> simple algorithm what Fit and Transform means for particular algorithm
>>> (IDF, NaiveBayes)
>>>
>>> Thanks
>>>
>>> On Tue, Nov 1, 2016 at 5:45 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> I have come across similar situation recently and decided to run
>>>> Training  workflow less frequently than scoring workflow.
>>>>
>>>> In your use case I would imagine you will run IDF fit workflow once in
>>>> say a week. It will produce a model object which will be saved. In scoring
>>>> workflow, you will typically see new unseen dataset and the model generated
>>>> in training flow will be used to score or label this new dataset.
>>>>
>>>> Note, train and test datasets are used during development phase when
>>>> you are trying to find out which model to use and
>>>> efficientcy/performance/accuracy etc. It will never be part of
>>>> workflow. In a little elaborate setting you may want to automate model
>>>> evaluations, but that's a different story.
>>>>
>>>> Not sure if I could explain properly, please feel free to comment.
>>>> On 1 Nov 2016 22:54, "Nirav Patel" <npa...@xactlycorp.com> wrote:
>>>>
>>>>> Yes, I do apply NaiveBayes after IDF .
>>>>>
>>>>> " you can re-train (fit) on all your data before applying it to
>>>>> unseen data." Did you mean I can reuse that model to Transform both
>>>>> training and test data?
>>>>>
>>>>> Here's the process:
>>>>>
>>>>> Datasets:
>>>>>
>>>>>1. Full sample data (labeled)
>>>>>2. Training (labeled)
>>>>>3. Test (labeled)
>>>>>4. Unseen (non-labeled)
>>>>>
>>>>> Here are two workflow options I see:
>>>>>
>>>>> Option - 1 (currently using)
>>>>>
>>>>>1. Fit IDF model (idf-1) on full Sample data
>>>>>2. Apply(Transform) idf-1 on full sample data
>>>>>3. Split data set into Training and Test data
>>>>>4. Fit ML model on Training data
>>>>>5. Apply(Transform) model on Test data
>>>>>6. Apply(Transform) idf-1 on Unseen data
>>>>>7. Apply(Transform) model on Unseen data
>>>>>
>>>>> Option - 2
>>>>>
>>>>> 

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread ayan guha
Hi

"classification algorithm will for sure need to Fit against new dataset to
produce new model" - I do not think this is correct. Maybe we are talking
semantics but AFAIU, you "train" one model using some dataset, and then use
it for scoring new datasets.

You may re-train every month, yes. And you may run cross validation once a
month (after re-training) or lower freq like once in 2-3 months to validate
model quality. Here, number of months are not important, but you must be
running cross validation and similar sort of "model evaluation" work flow
typically in lower frequency than Re-Training process.

On Wed, Nov 2, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi Ayan,
> After deployment, we might re-train it every month. That is whole
> different problem I have explored yet. classification algorithm will for
> sure need to Fit against new dataset to produce new model. Correct me if I
> am wrong but I think I will also FIt new IDF model based on new dataset. At
> that time as well I will follow same training-validation split (or
> corss-validation) to evaluate model performance on new data before
> releasing it to make prediction. So afik , every time you  need to re-train
> model you will need to corss validate using some data split strategy.
>
> I think spark ML document should start explaining mathematical model or
> simple algorithm what Fit and Transform means for particular algorithm
> (IDF, NaiveBayes)
>
> Thanks
>
> On Tue, Nov 1, 2016 at 5:45 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> I have come across similar situation recently and decided to run
>> Training  workflow less frequently than scoring workflow.
>>
>> In your use case I would imagine you will run IDF fit workflow once in
>> say a week. It will produce a model object which will be saved. In scoring
>> workflow, you will typically see new unseen dataset and the model generated
>> in training flow will be used to score or label this new dataset.
>>
>> Note, train and test datasets are used during development phase when you
>> are trying to find out which model to use and 
>> efficientcy/performance/accuracy
>> etc. It will never be part of workflow. In a little elaborate setting you
>> may want to automate model evaluations, but that's a different story.
>>
>> Not sure if I could explain properly, please feel free to comment.
>> On 1 Nov 2016 22:54, "Nirav Patel" <npa...@xactlycorp.com> wrote:
>>
>>> Yes, I do apply NaiveBayes after IDF .
>>>
>>> " you can re-train (fit) on all your data before applying it to unseen
>>> data." Did you mean I can reuse that model to Transform both training and
>>> test data?
>>>
>>> Here's the process:
>>>
>>> Datasets:
>>>
>>>1. Full sample data (labeled)
>>>2. Training (labeled)
>>>3. Test (labeled)
>>>4. Unseen (non-labeled)
>>>
>>> Here are two workflow options I see:
>>>
>>> Option - 1 (currently using)
>>>
>>>1. Fit IDF model (idf-1) on full Sample data
>>>2. Apply(Transform) idf-1 on full sample data
>>>3. Split data set into Training and Test data
>>>4. Fit ML model on Training data
>>>5. Apply(Transform) model on Test data
>>>6. Apply(Transform) idf-1 on Unseen data
>>>7. Apply(Transform) model on Unseen data
>>>
>>> Option - 2
>>>
>>>1. Split sample data into Training and Test data
>>>2. Fit IDF model (idf-1) only on training data
>>>3. Apply(Transform) idf-1 on training data
>>>4. Apply(Transform) idf-1 on test data
>>>5. Fit ML model on Training data
>>>6. Apply(Transform) model on Test data
>>>7. Apply(Transform) idf-1 on Unseen data
>>>8. Apply(Transform) model on Unseen data
>>>
>>> So you are suggesting Option-2 in this particular case, right?
>>>
>>> On Tue, Nov 1, 2016 at 4:24 AM, Robin East <robin.e...@xense.co.uk>
>>> wrote:
>>>
>>>> Fit it on training data to evaluate the model. You can either use that
>>>> model to apply to unseen data or you can re-train (fit) on all your data
>>>> before applying it to unseen data.
>>>>
>>>> fit and transform are 2 different things: fit creates a model,
>>>> transform applies a model to data to create transformed output. If you are
>>>> using your training data in a subsequent step (e.g. running logistic
>>>> regression or some other machine learn

Re: Add jar files on classpath when submitting tasks to Spark

2016-11-01 Thread ayan guha
There are options to specify external jars in the form of --jars,
--driver-classpath etc depending on spark version and cluster manager..
Please see spark documents for configuration sections and/or run spark
submit help to see available options.
On 1 Nov 2016 23:13, "Jan Botorek"  wrote:

> Hello,
>
> I have a problem trying to add jar files to be available on classpath when
> submitting task to Spark.
>
>
>
> In my spark-defaults.conf file I have configuration:
>
> *spark.driver.extraClassPath = path/to/folder/with/jars*
>
> all jars in the folder are available in SPARK-SHELL
>
>
>
> The problem is that jars are not on the classpath for SPARK-MASTER; more
> precisely – when I submit any job that utilizes any jar from external
> folder, the* java.lang.ClassNotFoundException* is thrown.
>
> Moving all external jars into the *jars* folder solves the situation, but
> we need to keep external files separatedly.
>
>
>
> Thank you for any help
>
> Best regards,
>
> Jan
>


Re: Spark ML - Is IDF model reusable

2016-11-01 Thread ayan guha
I have come across similar situation recently and decided to run Training
workflow less frequently than scoring workflow.

In your use case I would imagine you will run IDF fit workflow once in say
a week. It will produce a model object which will be saved. In scoring
workflow, you will typically see new unseen dataset and the model generated
in training flow will be used to score or label this new dataset.

Note, train and test datasets are used during development phase when you
are trying to find out which model to use and
efficientcy/performance/accuracy etc. It will never be part of workflow. In
a little elaborate setting you may want to automate model evaluations, but
that's a different story.

Not sure if I could explain properly, please feel free to comment.
On 1 Nov 2016 22:54, "Nirav Patel"  wrote:

> Yes, I do apply NaiveBayes after IDF .
>
> " you can re-train (fit) on all your data before applying it to unseen
> data." Did you mean I can reuse that model to Transform both training and
> test data?
>
> Here's the process:
>
> Datasets:
>
>1. Full sample data (labeled)
>2. Training (labeled)
>3. Test (labeled)
>4. Unseen (non-labeled)
>
> Here are two workflow options I see:
>
> Option - 1 (currently using)
>
>1. Fit IDF model (idf-1) on full Sample data
>2. Apply(Transform) idf-1 on full sample data
>3. Split data set into Training and Test data
>4. Fit ML model on Training data
>5. Apply(Transform) model on Test data
>6. Apply(Transform) idf-1 on Unseen data
>7. Apply(Transform) model on Unseen data
>
> Option - 2
>
>1. Split sample data into Training and Test data
>2. Fit IDF model (idf-1) only on training data
>3. Apply(Transform) idf-1 on training data
>4. Apply(Transform) idf-1 on test data
>5. Fit ML model on Training data
>6. Apply(Transform) model on Test data
>7. Apply(Transform) idf-1 on Unseen data
>8. Apply(Transform) model on Unseen data
>
> So you are suggesting Option-2 in this particular case, right?
>
> On Tue, Nov 1, 2016 at 4:24 AM, Robin East  wrote:
>
>> Fit it on training data to evaluate the model. You can either use that
>> model to apply to unseen data or you can re-train (fit) on all your data
>> before applying it to unseen data.
>>
>> fit and transform are 2 different things: fit creates a model, transform
>> applies a model to data to create transformed output. If you are using your
>> training data in a subsequent step (e.g. running logistic regression or
>> some other machine learning algorithm) then you need to transform your
>> training data using the IDF model before passing it through the next step.
>>
>> 
>> ---
>> Robin East
>> *Spark GraphX in Action* Michael Malak and Robin East
>> Manning Publications Co.
>> http://www.manning.com/books/spark-graphx-in-action
>>
>>
>>
>>
>>
>> On 1 Nov 2016, at 11:18, Nirav Patel  wrote:
>>
>> Just to re-iterate what you said, I should fit IDF model only on training
>> data and then re-use it for both test data and then later on unseen data to
>> make predictions.
>>
>> On Tue, Nov 1, 2016 at 3:49 AM, Robin East 
>> wrote:
>>
>>> The point of setting aside a portion of your data as a test set is to
>>> try and mimic applying your model to unseen data. If you fit your IDF model
>>> to all your data, any evaluation you perform on your test set is likely to
>>> over perform compared to ‘real’ unseen data. Effectively you would have
>>> overfit your model.
>>> 
>>> ---
>>> Robin East
>>> *Spark GraphX in Action* Michael Malak and Robin East
>>> Manning Publications Co.
>>> http://www.manning.com/books/spark-graphx-in-action
>>>
>>>
>>>
>>>
>>>
>>> On 1 Nov 2016, at 10:15, Nirav Patel  wrote:
>>>
>>> FYI, I do reuse IDF model while making prediction against new unlabeled
>>> data but not between training and test data while training a model.
>>>
>>> On Tue, Nov 1, 2016 at 3:10 AM, Nirav Patel 
>>> wrote:
>>>
 I am using IDF estimator/model (TF-IDF) to convert text features into
 vectors. Currently, I fit IDF model on all sample data and then transform
 them. I read somewhere that I should split my data into training and test
 before fitting IDF model; Fit IDF only on training data and then use same
 transformer to transform training and test data.
 This raise more questions:
 1) Why would you do that? What exactly do IDF learn during fitting
 process that it can reuse to transform any new dataset. Perhaps idea is to
 keep same value for |D| and DF|t, D| while use new TF|t, D| ?
 2) If not then fitting and transforming seems redundant for IDF model

>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] 

Re: spark dataframe rolling window for user define operation

2016-10-29 Thread ayan guha
Avg is an aggregation function. You need to write XYZ as user defined
aggregate function (UDAF).

On Sat, Oct 29, 2016 at 9:28 PM, Manjunath, Kiran <kiman...@akamai.com>
wrote:

> Is there a way to get user defined operation to be used for rolling window
> operation?
>
>
>
> Like – Instead of
>
>
>
> val wSpec1 = Window.orderBy("c1").rowsBetween(-20, +20)
>
> var dfWithMovingAvg = df.withColumn( "Avg",avg(df("c2")).over(wSpec1))
>
>
>
> Something like
>
>
>
> val wSpec1 = Window.orderBy("c1").rowsBetween(-20, +20)
>
> var dfWithAlternate = df.withColumn( "alter",*XYZ*(df("c2")).over(wSpec1))
>
>
>
> Where XYZ function can be - +,-,+,- alternatively
>
>
>
>
>
> PS : I have posted the same question at http://stackoverflow.com/
> questions/40318010/spark-dataframe-rolling-window-user-define-operation
>
>
>
> Regards,
>
> Kiran
>



-- 
Best Regards,
Ayan Guha


Re: HiveContext is Serialized?

2016-10-26 Thread ayan guha
In your use case, your dedf need not to be a data frame. You could use
SC.textFile().collect.
Even better you can just read off a local file, as your file is very small,
unless you are planning to use yarn cluster mode.
On 26 Oct 2016 16:43, "Ajay Chander"  wrote:

> Sean, thank you for making it clear. It was helpful.
>
> Regards,
> Ajay
>
> On Wednesday, October 26, 2016, Sean Owen  wrote:
>
>> This usage is fine, because you are only using the HiveContext locally on
>> the driver. It's applied in a function that's used on a Scala collection.
>>
>> You can't use the HiveContext or SparkContext in a distribution
>> operation. It has nothing to do with for loops.
>>
>> The fact that they're serializable is misleading. It's there, I believe,
>> because these objects may be inadvertently referenced in the closure of a
>> function that executes remotely, yet doesn't use the context. The closure
>> cleaner can't always remove this reference. The task would fail to
>> serialize even though it doesn't use the context. You will find these
>> objects serialize but then don't work if used remotely.
>>
>> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
>> IIRC.
>>
>> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>>
>>> Hi Everyone,
>>>
>>> I was thinking if I can use hiveContext inside foreach like below,
>>>
>>> object Test {
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val conf = new SparkConf()
>>> val sc = new SparkContext(conf)
>>> val hiveContext = new HiveContext(sc)
>>>
>>> val dataElementsFile = args(0)
>>> val deDF = 
>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>
>>> def calculate(de: Row) {
>>>   val dataElement = de.getAs[String]("DataElement").trim
>>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>>> TEST_DB.TEST_TABLE1 ")
>>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>> }
>>>
>>> deDF.collect().foreach(calculate)
>>>   }
>>> }
>>>
>>>
>>> I looked at 
>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>  and I see it is extending SqlContext which extends Logging with 
>>> Serializable.
>>>
>>> Can anyone tell me if this is the right way to use it ? Thanks for your 
>>> time.
>>>
>>> Regards,
>>>
>>> Ajay
>>>
>>>


Re: Spark 1.2

2016-10-25 Thread ayan guha
Thank you both.

On Tue, Oct 25, 2016 at 11:30 PM, Sean Owen <so...@cloudera.com> wrote:

> archive.apache.org will always have all the releases:
> http://archive.apache.org/dist/spark/
>
> On Tue, Oct 25, 2016 at 1:17 PM ayan guha <guha.a...@gmail.com> wrote:
>
>> Just in case, anyone knows how I can download Spark 1.2? It is not
>> showing up in Spark download page drop down
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>


-- 
Best Regards,
Ayan Guha


Spark 1.2

2016-10-25 Thread ayan guha
Just in case, anyone knows how I can download Spark 1.2? It is not showing
up in Spark download page drop down

-- 
Best Regards,
Ayan Guha


Re: Can i display message on console when use spark on yarn?

2016-10-20 Thread ayan guha
What do you exactly mean by Yarn Console? We use spark-submit and it
generates exactly same log as you mentioned on driver console,

On Thu, Oct 20, 2016 at 8:21 PM, Jone Zhang <joyoungzh...@gmail.com> wrote:

> I submit spark with "spark-submit --master yarn-cluster --deploy-mode
> cluster"
> How can i display message on yarn console.
> I expect it to be like this:
>
> .
> 16/10/20 17:12:53 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:12:58 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:13:03 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:13:08 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:13:13 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: FINISHED)
> 16/10/20 17:13:13 main INFO org.apache.spark.deploy.yarn.Client>SPK>
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.51.215.100
>  ApplicationMaster RPC port: 0
>  queue: root.default
>  start time: 1476954698645
>  final status: SUCCEEDED
>  tracking URL: http://10.179.20.47:8080/proxy/application_
> 1453970859007_481440/history/application_1453970859007_481440/1
>  user: mqq
> ===Spark Task Result is ===
> ===some message want to display===
> 16/10/20 17:13:13 Thread-3 INFO org.apache.spark.util.ShutdownHookManager>SPK>
> Shutdown hook called
> 16/10/20 17:13:13 Thread-3 INFO org.apache.spark.util.ShutdownHookManager>SPK>
> Deleting directory /data/home/spark_tmp/spark-5b9f434b-5837-46e6-9625-
> c4656b86af9e
>
> Thanks.
>
>
>


-- 
Best Regards,
Ayan Guha


Re: pyspark dataframe codes for lead lag to column

2016-10-20 Thread ayan guha
Yes there are similar functions available, depending on your spark version
look up Pyspark SQL Function module documentation. I also prefer to use SQL
directly within pyspark.

On Thu, Oct 20, 2016 at 8:18 PM, Mendelson, Assaf <assaf.mendel...@rsa.com>
wrote:

> Depending on your usecase, you may want to take a look at window functions
>
>
>
> *From:* muhammet pakyürek [mailto:mpa...@hotmail.com]
> *Sent:* Thursday, October 20, 2016 11:36 AM
> *To:* user@spark.apache.org
> *Subject:* pyspark dataframe codes for lead lag to column
>
>
>
>
>
>
>
> is there pyspark dataframe codes for lead lag to column?
>
>
>
> lead/lag column is something
>
>
>
> 1  lag   -1lead 2
>
> 213
>
> 324
>
> 4    3    5
>
> 54   -1
>



-- 
Best Regards,
Ayan Guha


Re: [Spark][issue]Writing Hive Partitioned table

2016-10-19 Thread ayan guha
Hi Group

Sorry to rekindle this thread.

Using Spark 1.6.0 on CDH 5.7.

Any idea?


Best
Ayan

On Fri, Oct 7, 2016 at 5:08 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Ayan,
>
> Depends on the version of Spark you are using.
>
> Have you tried updating stats in Hive?
>
> ANALYZE TABLE ${DATABASE}.${TABLE} PARTITION (${PARTITION_NAME}) COMPUTE
> STATISTICS FOR COLUMNS
>
> and then do
>
> show create table ${TABLE}
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 7 October 2016 at 03:46, ayan guha <guha.a...@gmail.com> wrote:
>
>> Posting with correct subject.
>>
>> On Fri, Oct 7, 2016 at 12:37 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> Faced one issue:
>>>
>>> - Writing Hive Partitioned table using
>>>
>>> df.withColumn("partition_date",to_date(df["INTERVAL_DATE"]))
>>> .write.partitionBy('partition_date').saveAsTable("sometable"
>>> ,mode="overwrite")
>>>
>>> - Data got written to HDFS fine. I can see the folders with partition
>>> names such as
>>>
>>> /app/somedb/hive/somedb.db/sometable/partition_date=2016-09-28
>>> /app/somedb/hive/somedb.db/sometable/partition_date=2016-09-29
>>>
>>> and so on.
>>> - Also, _common_metadata & _metadata files are written properly
>>>
>>> - I can read data from spark fine using 
>>> read.parquet("/app/somedb/hive/somedb.db/sometable").
>>> Printschema showing all columns.
>>>
>>> - However, I can not read from hive.
>>>
>>> Problem 1: Hive does not think the table is partitioned
>>> Problem 2: Hive sees only 1 column
>>> array from deserializer
>>> Problem 3: MSCK repair table failed, saying partitions are not in
>>> Metadata.
>>>
>>> Question: Is it a known issue with Spark to write to Hive partitioned
>>> table?
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


<    1   2   3   4   5   6   7   8   >