Re: FPGrowth Model is taking too long to generate frequent item sets

2017-03-14 Thread Raju Bairishetti
Hi Yuhao,
I have tried numPartitions from (numExecutors * numExecutorCores),
1000, 2000 and 1. I did not see much improvement.

Having more partitions solved some perf issues but did not see any
improvement when I give less minsupport.

It is generating 260 million frequent item sets with 63K transactions and
200K Items in total with lesser min support value.

On Tue, Mar 14, 2017 at 3:30 PM, Yuhao Yang <hhb...@gmail.com> wrote:

> Hi Raju,
>
> Have you tried setNumPartitions with a larger number?
>
> 2017-03-07 0:30 GMT-08:00 Eli Super <eli.su...@gmail.com>:
>
>> Hi
>>
>> It's area of knowledge , you will need to read online several hours about
>> it
>>
>> What is your programming language ?
>>
>> Try search online : "machine learning binning %my_programing_langauge%"
>> and
>> "machine learning feature engineering %my_programing_langauge%"
>>
>> On Tue, Mar 7, 2017 at 3:39 AM, Raju Bairishetti <r...@apache.org> wrote:
>>
>>> @Eli, Thanks for the suggestion. If you do not mind can you please
>>> elaborate approaches?
>>>
>>> On Mon, Mar 6, 2017 at 7:29 PM, Eli Super <eli.su...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> Try to implement binning and/or feature engineering (smart feature
>>>> selection for example)
>>>>
>>>> Good luck
>>>>
>>>> On Mon, Mar 6, 2017 at 6:56 AM, Raju Bairishetti <r...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>   I am new to Spark ML Lib. I am using FPGrowth model for finding
>>>>> related items.
>>>>>
>>>>> Number of transactions are 63K and the total number of items in all
>>>>> transactions are 200K.
>>>>>
>>>>> I am running FPGrowth model to generate frequent items sets. It is
>>>>> taking huge amount of time to generate frequent itemsets.* I am
>>>>> setting min-support value such that each item appears in at least ~(number
>>>>> of items)/(number of transactions).*
>>>>>
>>>>> It is taking lots of time in case If I say item can appear at least
>>>>> once in the database.
>>>>>
>>>>> If I give higher value to min-support then output is very smaller.
>>>>>
>>>>> Could anyone please guide me how to reduce the execution time for
>>>>> generating frequent items?
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Raju Bairishetti,
>>>>> www.lazada.com
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com
>>>
>>
>>
>


-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: FPGrowth Model is taking too long to generate frequent item sets

2017-03-06 Thread Raju Bairishetti
@Eli, Thanks for the suggestion. If you do not mind can you please
elaborate approaches?

On Mon, Mar 6, 2017 at 7:29 PM, Eli Super <eli.su...@gmail.com> wrote:

> Hi
>
> Try to implement binning and/or feature engineering (smart feature
> selection for example)
>
> Good luck
>
> On Mon, Mar 6, 2017 at 6:56 AM, Raju Bairishetti <r...@apache.org> wrote:
>
>> Hi,
>>   I am new to Spark ML Lib. I am using FPGrowth model for finding related
>> items.
>>
>> Number of transactions are 63K and the total number of items in all
>> transactions are 200K.
>>
>> I am running FPGrowth model to generate frequent items sets. It is taking
>> huge amount of time to generate frequent itemsets.* I am setting
>> min-support value such that each item appears in at least ~(number of
>> items)/(number of transactions).*
>>
>> It is taking lots of time in case If I say item can appear at least once
>> in the database.
>>
>> If I give higher value to min-support then output is very smaller.
>>
>> Could anyone please guide me how to reduce the execution time for
>> generating frequent items?
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>


-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


FPGrowth Model is taking too long to generate frequent item sets

2017-03-05 Thread Raju Bairishetti
Hi,
  I am new to Spark ML Lib. I am using FPGrowth model for finding related
items.

Number of transactions are 63K and the total number of items in all
transactions are 200K.

I am running FPGrowth model to generate frequent items sets. It is taking
huge amount of time to generate frequent itemsets.* I am setting
min-support value such that each item appears in at least ~(number of
items)/(number of transactions).*

It is taking lots of time in case If I say item can appear at least once in
the database.

If I give higher value to min-support then output is very smaller.

Could anyone please guide me how to reduce the execution time for
generating frequent items?

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Thanks Yong for the response. Adding my responses inline

On Tue, Jan 17, 2017 at 10:27 PM, Yong Zhang <java8...@hotmail.com> wrote:

> What DB you are using for your Hive meta store, and what types are your
> partition columns?
>
I am using MySql for Hive metastore. Partition columns are  combination of
INT and STRING types.

>
> You maybe want to read the discussion in SPARK-6910, and especially the
> comments in PR. There are some limitation about partition pruning in
> Hive/Spark, maybe yours is one of them
>
Seems I had already gone through SPARK-6910 and corresponding all PRs.
*spark.sql.hive.convertMetastoreParquet   false*
*spark.sql.hive.metastorePartitionPruning   true*
*I had set the above properties from *SPARK-6910 & PRs.


>

> Yong
>
>
> --
> *From:* Raju Bairishetti <r...@apache.org>
> *Sent:* Tuesday, January 17, 2017 3:00 AM
> *To:* user @spark
> *Subject:* Re: Spark sql query plan contains all the partitions from hive
> table even though filtering of partitions is provided
>
> Had a high level look into the code. Seems getHiveQlPartitions  method
> from HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning
> conf value.
>
>  It should not fetch all partitions if we set metastorePartitionPruning to
> true (Default value for this is false)
>
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
>
> ...
>
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
>
> lazy val allPartitions = table.getAllPartitions
>
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
>
> Am I missing something or looking at wrong place?
>
>
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org>
> wrote:
>
>> Waiting for suggestions/help on this...
>>
>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org>
>> wrote:
>>
>>> Hello,
>>>
>>>Spark sql is generating query plan with all partitions information
>>> even though if we apply filters on partitions in the query.  Due to this,
>>> spark driver/hive metastore is hitting with OOM as each table is with lots
>>> of partitions.
>>>
>>> We can confirm from hive audit logs that it tries to *fetch all
>>> partitions* from hive metastore.
>>>
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>>> cmd=get_partitions : db= tbl=x
>>>
>>>
>>> Configured the following parameters in the spark conf to fix the above
>>> issue(source: from spark-jira & github pullreq):
>>>
>>> *spark.sql.hive.convertMetastoreParquet   false *
>>> *spark.sql.hive.metastorePartitionPruning   true*
>>>
>>>
>>> *   plan:  rdf.explain *
>>> *   == Physical Plan ==*
>>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>
>>> *get_partitions_by_filter* method is called and fetching only
>>> required partitions.
>>>
>>> But we are seeing parquetDecode errors in our applications
>>> frequently after this. Looks like these decoding errors were because of
>>> changing serde from spark-builtin to hive serde.
>>>
>>> I feel like,* fixing query plan generation in the spark-sql* is the
>>> right approach instead of forcing users to use hive serde.
>>>
>>> Is there any workaround/way to fix this issue? I would like to hear more
>>> thoughts on this :)
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com
>>>
>>
>>
>>
>> --
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Had a high level look into the code. Seems getHiveQlPartitions  method from
HiveMetastoreCatalog is getting called irrespective of
metastorePartitionPruning
conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to
true (Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?


On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <r...@apache.org> wrote:

> Waiting for suggestions/help on this...
>
> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org>
> wrote:
>
>> Hello,
>>
>>Spark sql is generating query plan with all partitions information
>> even though if we apply filters on partitions in the query.  Due to this,
>> spark driver/hive metastore is hitting with OOM as each table is with lots
>> of partitions.
>>
>> We can confirm from hive audit logs that it tries to *fetch all
>> partitions* from hive metastore.
>>
>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>> cmd=get_partitions : db= tbl=x
>>
>>
>> Configured the following parameters in the spark conf to fix the above
>> issue(source: from spark-jira & github pullreq):
>>
>> *spark.sql.hive.convertMetastoreParquet   false*
>> *spark.sql.hive.metastorePartitionPruning   true*
>>
>>
>> *   plan:  rdf.explain*
>> *   == Physical Plan ==*
>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>
>> *get_partitions_by_filter* method is called and fetching only
>> required partitions.
>>
>> But we are seeing parquetDecode errors in our applications frequently
>> after this. Looks like these decoding errors were because of changing
>> serde from spark-builtin to hive serde.
>>
>> I feel like,* fixing query plan generation in the spark-sql* is the
>> right approach instead of forcing users to use hive serde.
>>
>> Is there any workaround/way to fix this issue? I would like to hear more
>> thoughts on this :)
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-15 Thread Raju Bairishetti
Waiting for suggestions/help on this...

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <r...@apache.org> wrote:

> Hello,
>
>Spark sql is generating query plan with all partitions information even
> though if we apply filters on partitions in the query.  Due to this, spark
> driver/hive metastore is hitting with OOM as each table is with lots of
> partitions.
>
> We can confirm from hive audit logs that it tries to *fetch all
> partitions* from hive metastore.
>
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
> cmd=get_partitions : db= tbl=x
>
>
> Configured the following parameters in the spark conf to fix the above
> issue(source: from spark-jira & github pullreq):
>
> *spark.sql.hive.convertMetastoreParquet   false*
> *spark.sql.hive.metastorePartitionPruning   true*
>
>
> *   plan:  rdf.explain*
> *   == Physical Plan ==*
>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>
> *get_partitions_by_filter* method is called and fetching only
> required partitions.
>
> But we are seeing parquetDecode errors in our applications frequently
> after this. Looks like these decoding errors were because of changing
> serde from spark-builtin to hive serde.
>
> I feel like,* fixing query plan generation in the spark-sql* is the right
> approach instead of forcing users to use hive serde.
>
> Is there any workaround/way to fix this issue? I would like to hear more
> thoughts on this :)
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark Log information

2017-01-15 Thread Raju Bairishetti
Total number of tasks in the stage is: 21428
Number of tasks completed so far:  44
Number of tasks running now: 48

On Mon, Jan 16, 2017 at 11:41 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> when running the spark jobs, I see the numbers in stages. can anyone tell
> what does this number indicate in the below case.
>
> [Stage 2:>(44 + 48) /
> 21428]
>
> 44+28 and 21428.
>
> Thanks,
> Asmath
>
>


-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-10 Thread Raju Bairishetti
Hello,

   Spark sql is generating query plan with all partitions information even
though if we apply filters on partitions in the query.  Due to this, spark
driver/hive metastore is hitting with OOM as each table is with lots of
partitions.

We can confirm from hive audit logs that it tries to *fetch all partitions*
from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
(HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
cmd=get_partitions : db= tbl=x


Configured the following parameters in the spark conf to fix the above
issue(source: from spark-jira & github pullreq):

*spark.sql.hive.convertMetastoreParquet   false*
*spark.sql.hive.metastorePartitionPruning   true*


*   plan:  rdf.explain*
*   == Physical Plan ==*
   HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
28),(hour#317 = 2),(venture#318 = DEFAULT)]

*get_partitions_by_filter* method is called and fetching only required
partitions.

But we are seeing parquetDecode errors in our applications frequently
after this. Looks like these decoding errors were because of changing
serde from spark-builtin to hive serde.

I feel like,* fixing query plan generation in the spark-sql* is the right
approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more
thoughts on this :)

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: Spark Dataframe: Save to hdfs is taking long time

2016-12-28 Thread Raju Bairishetti
Try setting num partitions to (number of executors * number of cores) while
writing to dest location.

You should be very very careful while setting num partitions as incorrect
number may lead to shuffle.

On Fri, Dec 16, 2016 at 12:56 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> I am trying to save the files as Paraquet.
>
> On Thu, Dec 15, 2016 at 10:41 PM, Felix Cheung <felixcheun...@hotmail.com>
> wrote:
>
>> What is the format?
>>
>>
>> --
>> *From:* KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
>> *Sent:* Thursday, December 15, 2016 7:54:27 PM
>> *To:* user @spark
>> *Subject:* Spark Dataframe: Save to hdfs is taking long time
>>
>> Hi,
>>
>> I am using issue while saving the dataframe back to HDFS. It's taking
>> long time to run.
>>
>> val results_dataframe = sqlContext.sql("select gt.*,ct.* from 
>> PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where 
>> gt.vin=pt.vin and pt.cluster=ct.cluster")
>> results_dataframe.coalesce(numPartitions)
>> results_dataframe.persist(StorageLevel.MEMORY_AND_DISK)
>>
>> dataFrame.write.mode(saveMode).format(format)
>>   .option(Codec, compressCodec) //"org.apache.hadoop.io.compress.snappyCodec"
>>   .save(outputPath)
>>
>> It was taking long time and total number of records for  this dataframe is 
>> 4903764
>>
>> I even increased number of partitions from 10 to 20, still no luck. Can 
>> anyone help me in resolving this performance issue
>>
>> Thanks,
>>
>> Asmath
>>
>>
>


-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: How to run hive queries in async mode using spark sql

2016-05-24 Thread Raju Bairishetti
Hi Mich,

  Thanks for the response.

yes, I do not want to block until the hive query is completed and want to
know is there any way to poll the status/progress of submitted query.

I can turn on asyc mode for hive queries in spark sql but  how to track the
status of the submitted query?

On Tue, May 24, 2016 at 6:48 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> By Hive queries in async mode, you mean submitting sql queries to Hive and
> move on to the next operation and wait for return of result set from Hive?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 24 May 2016 at 11:26, Raju Bairishetti <raju@gmail.com> wrote:
>
>> Any thoughts on this?
>>
>> In hive, it returns operation handle. This handle can be used for
>> fetching the status of query.  Is there any similar mechanism in spark sql?
>> Looks like almost all the methods in the HiveContext are either protected
>> or private.
>>
>> On Wed, May 18, 2016 at 9:03 AM, Raju Bairishetti <raju@gmail.com>
>> wrote:
>>
>>> I am using spark sql for running hive queries also. Is there any way to
>>> run hive queries in asyc mode using spark sql.
>>>
>>> Does it return any hive handle or if yes how to get the results from
>>> hive handle using spark sql?
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>>
>>> www.lazada.com
>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>>
>> www.lazada.com
>>
>>
>>
>


-- 
Thanks,
Raju Bairishetti,

www.lazada.com


Re: How to run hive queries in async mode using spark sql

2016-05-24 Thread Raju Bairishetti
Any thoughts on this?

In hive, it returns operation handle. This handle can be used for fetching
the status of query.  Is there any similar mechanism in spark sql? Looks
like almost all the methods in the HiveContext are either protected or
private.

On Wed, May 18, 2016 at 9:03 AM, Raju Bairishetti <raju@gmail.com>
wrote:

> I am using spark sql for running hive queries also. Is there any way to
> run hive queries in asyc mode using spark sql.
>
> Does it return any hive handle or if yes how to get the results from hive
> handle using spark sql?
>
> --
> Thanks,
> Raju Bairishetti,
>
> www.lazada.com
>
>
>


-- 
Thanks,
Raju Bairishetti,

www.lazada.com


How to run hive queries in async mode using spark sql

2016-05-17 Thread Raju Bairishetti
I am using spark sql for running hive queries also. Is there any way to run
hive queries in asyc mode using spark sql.

Does it return any hive handle or if yes how to get the results from hive
handle using spark sql?

-- 
Thanks,
Raju Bairishetti,

www.lazada.com


Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

2016-01-24 Thread Raju Bairishetti
Hi Yash,
   Basically, my question is how to avoid storing the kafka offsets in
spark checkpoint directory. Streaming context is getting build from
checkpoint directory and proceeding with the offsets in checkpointed RDD.

I want to consume data from kafka from specific offsets along with the
spark checkpoints. Streaming context is getting prepared from the
checkpoint directory and started consuming from the topic offsets which
were stored in checkpoint directory.


On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma <yash...@gmail.com> wrote:

> Hi Raju,
> Could you please explain your expected behavior with the DStream. The
> DStream will have event only from the 'fromOffsets' that you provided in
> the createDirectStream (which I think is the expected behavior).
>
> For the smaller files, you will have to deal with smaller files if you
> intend to write it immediately. Alternately what we do sometimes is-
>
> 1.  Maintain couple of iterations for some 30-40 seconds in application
> until we have substantial data and then we write them to disk.
> 2. Push smaller data back to kafka, and a different job handles the save
> to disk.
>
> On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <r...@apache.org> wrote:
>
>> Thanks for quick reply.
>> I am creating Kafka Dstream by passing offsets map. I have pasted code
>> snippet in my earlier mail. Let me know am I missing something.
>>
>> I want to use spark checkpoint for hand ng only driver/executor failures.
>> On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>>> Offsets are stored in the checkpoint.  If you want to manage offsets
>>> yourself, don't restart from the checkpoint, specify the starting offsets
>>> when you create the stream.
>>>
>>> Have you read / watched the materials linked from
>>>
>>> https://github.com/koeninger/kafka-exactly-once
>>>
>>> Regarding the small files problem, either don't use HDFS, or use
>>> something like filecrush for merging.
>>>
>>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>>I am very new to spark & spark-streaming. I am planning to use spark
>>>> streaming for real time processing.
>>>>
>>>>I have created a streaming context and checkpointing to hdfs
>>>> directory for recovery purposes in case of executor failures & driver
>>>> failures.
>>>>
>>>> I am creating Dstream with offset map for getting the data from kafka.
>>>> I am simply ignoring the offsets to understand the behavior. Whenver I
>>>> restart application driver restored from checkpoint as expected but Dstream
>>>> is not getting started from the initial offsets. Dstream was created with
>>>> the last consumed offsets instead of startign from 0 offsets for each topic
>>>> partition as I am not storing the offsets any where.
>>>>
>>>> def main : Unit = {
>>>>
>>>> var sparkStreamingContext = 
>>>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>>>   () => creatingFunc())
>>>>
>>>> ...
>>>>
>>>>
>>>> }
>>>>
>>>> def creatingFunc(): Unit = {
>>>>
>>>> ...
>>>>
>>>> var offsets:Map[TopicAndPartition, Long] = 
>>>> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>>>
>>>> KafkaUtils.createDirectStream[String,String, StringDecoder, 
>>>> StringDecoder,
>>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>>>
>>>> ...
>>>> }
>>>>
>>>> I want to get control over offset management at event level instead of
>>>> RDD level to make sure that at least once delivery to end system.
>>>>
>>>> As per my understanding, every RDD or RDD partition will stored in hdfs
>>>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>>>> interval then it will be ended up having huge number of small files in
>>>> HDFS. Having small files in HDFS will leads to lots of other issues.
>>>> Is there any way to write multiple RDDs into single file? Don't have
>>>> muh idea about *coalesce* usage. In the worst case, I can merge all small
>>>> files in HDFS in regular intervals.
>>>>
>>>> Thanks...
>>>>
>>>> --
>>>> Thanks
>>>> Raju Bairishetti
>>>> www.lazada.com
>>>>
>>>>
>>>>
>>>>
>>>
>


-- 

--
Thanks
Raju Bairishetti
www.lazada.com


Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

2016-01-23 Thread Raju Bairishetti
Thanks for quick reply.
I am creating Kafka Dstream by passing offsets map. I have pasted code
snippet in my earlier mail. Let me know am I missing something.

I want to use spark checkpoint for hand ng only driver/executor failures.
On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote:

> Offsets are stored in the checkpoint.  If you want to manage offsets
> yourself, don't restart from the checkpoint, specify the starting offsets
> when you create the stream.
>
> Have you read / watched the materials linked from
>
> https://github.com/koeninger/kafka-exactly-once
>
> Regarding the small files problem, either don't use HDFS, or use something
> like filecrush for merging.
>
> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org> wrote:
>
>> Hi,
>>
>>
>>I am very new to spark & spark-streaming. I am planning to use spark
>> streaming for real time processing.
>>
>>I have created a streaming context and checkpointing to hdfs directory
>> for recovery purposes in case of executor failures & driver failures.
>>
>> I am creating Dstream with offset map for getting the data from kafka. I
>> am simply ignoring the offsets to understand the behavior. Whenver I
>> restart application driver restored from checkpoint as expected but Dstream
>> is not getting started from the initial offsets. Dstream was created with
>> the last consumed offsets instead of startign from 0 offsets for each topic
>> partition as I am not storing the offsets any where.
>>
>> def main : Unit = {
>>
>> var sparkStreamingContext = 
>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>   () => creatingFunc())
>>
>> ...
>>
>>
>> }
>>
>> def creatingFunc(): Unit = {
>>
>> ...
>>
>> var offsets:Map[TopicAndPartition, Long] = 
>> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>
>> KafkaUtils.createDirectStream[String,String, StringDecoder, 
>> StringDecoder,
>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>
>> ...
>> }
>>
>> I want to get control over offset management at event level instead of
>> RDD level to make sure that at least once delivery to end system.
>>
>> As per my understanding, every RDD or RDD partition will stored in hdfs
>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>> interval then it will be ended up having huge number of small files in
>> HDFS. Having small files in HDFS will leads to lots of other issues.
>> Is there any way to write multiple RDDs into single file? Don't have muh
>> idea about *coalesce* usage. In the worst case, I can merge all small files
>> in HDFS in regular intervals.
>>
>> Thanks...
>>
>> --
>> Thanks
>> Raju Bairishetti
>> www.lazada.com
>>
>>
>>
>>
>


[Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

2016-01-22 Thread Raju Bairishetti
Hi,


   I am very new to spark & spark-streaming. I am planning to use spark
streaming for real time processing.

   I have created a streaming context and checkpointing to hdfs directory
for recovery purposes in case of executor failures & driver failures.

I am creating Dstream with offset map for getting the data from kafka. I am
simply ignoring the offsets to understand the behavior. Whenver I restart
application driver restored from checkpoint as expected but Dstream is not
getting started from the initial offsets. Dstream was created with the last
consumed offsets instead of startign from 0 offsets for each topic
partition as I am not storing the offsets any where.

def main : Unit = {

var sparkStreamingContext =
StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
  () => creatingFunc())

...


}

def creatingFunc(): Unit = {

...

var offsets:Map[TopicAndPartition, Long] =
Map(TopicAndPartition("sample_sample3_json",0) -> 0)

KafkaUtils.createDirectStream[String,String, StringDecoder,
StringDecoder,
String](sparkStreamingContext, kafkaParams, offsets, messageHandler)

...
}

I want to get control over offset management at event level instead of RDD
level to make sure that at least once delivery to end system.

As per my understanding, every RDD or RDD partition will stored in hdfs as
a file If I choose to use HDFS as output. If I use 1sec as batch interval
then it will be ended up having huge number of small files in HDFS. Having
small files in HDFS will leads to lots of other issues.
Is there any way to write multiple RDDs into single file? Don't have muh
idea about *coalesce* usage. In the worst case, I can merge all small files
in HDFS in regular intervals.

Thanks...

--
Thanks
Raju Bairishetti
www.lazada.com