Re: New Spark Datasource for Hive ACID tables

2019-07-27 Thread naresh Goud
Thanks Abhishek.
I will check it out.

Thank you,
Naresh

On Sat, Jul 27, 2019 at 9:21 PM Abhishek Somani 
wrote:

> Hey Naresh,
>
> There is a `shaded-dependecies` project inside the root directory. You
> need to go into that and build and publish that to local first.
>
> cd shaded-dependencies
>> sbt clean publishLocal
>>
>
> After that, come back out to the root directory and build that project.
> The spark-acid-shaded-dependencies jar will now be found:
>
>> cd ..
>> sbt assembly
>
>
> This will create the jar which you can use.
>
> On another note, unless you are making changes in the code, you don't need
> to build yourself as the jar is published in
> https://spark-packages.org/package/qubole/spark-acid. So you can just use
> it as:
>
> spark-shell --packages qubole:spark-acid:0.4.0-s_2.11
>
>
> ...and it will be automatically fetched and used.
>
> Thanks,
> Abhishek
>
>
> On Sun, Jul 28, 2019 at 4:42 AM naresh Goud 
> wrote:
>
>> It looks there is some internal dependency missing.
>>
>> libraryDependencies ++= Seq(
>> "com.qubole" %% "spark-acid-shaded-dependencies" % "0.1"
>> )
>>
>> How do we get it?
>>
>>
>> Thank you,
>> Naresh
>>
>>
>>
>>
>> Thanks,
>> Naresh
>> www.linkedin.com/in/naresh-dulam
>> http://hadoopandspark.blogspot.com/
>>
>>
>>
>> On Sat, Jul 27, 2019 at 5:34 PM naresh Goud 
>> wrote:
>>
>>> Hi Abhishek,
>>>
>>>
>>> We are not able to build jar using git hub code with below error?
>>>
>>> Any others able to build jars? Is there anything else missing?
>>>
>>>
>>>
>>> Note: Unresolved dependencies path:
>>> [warn]  com.qubole:spark-acid-shaded-dependencies_2.11:0.1
>>> (C:\Data\Hadoop\spark-acid-master\build.sbt#L51-54)
>>> [warn]+- com.qubole:spark-acid_2.11:0.4.0
>>> sbt.ResolveException: unresolved dependency:
>>> com.qubole#spark-acid-shaded-dependencies_2.11;0.1: not found
>>> at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
>>> at
>>> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
>>> at
>>> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
>>> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
>>> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
>>> at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
>>> at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
>>> at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
>>> at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
>>> at
>>> xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
>>> at
>>> xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
>>> at xsbt.boot.Using$.withResource(Using.scala:10)
>>> at xsbt.boot.Using$.apply(Using.scala:9)
>>> at
>>> xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
>>> at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
>>> at xsbt.boot.Locks$.apply0(Locks.scala:31)
>>> at xsbt.boot.Locks$.apply(Locks.scala:28)
>>> at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
>>> at sbt.IvySbt.withIvy(Ivy.scala:128)
>>> at sbt.IvySbt.withIvy(Ivy.scala:125)
>>> at sbt.IvySbt$Module.withModule(Ivy.scala:156)
>>> at sbt.IvyActions$.updateEither(IvyActions.scala:168)
>>> at
>>> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1541)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Naresh
>>> www.linkedin.com/in/naresh-dulam
>>> http://hadoopandspark.blogspot.com/
>>>
>>>
>>>
>>> On Sat, Jul 27, 2019 at 3:25 PM Nicolas Paris 
>>> wrote:
>>>
>>>> Congrats
>>>>
>>>> The read/write feature with hive3 is highly interesting
>>>>
>>>> On Fri, Jul 26, 2019 at 06:07:55PM +0530, Abhishek Somani wrote:
>>>> > Hi All,
>>>> >
>>>> > We at Qubole have open sourced a datasource that will enable users to
>>>> work on
>>&

Re: New Spark Datasource for Hive ACID tables

2019-07-27 Thread naresh Goud
It looks there is some internal dependency missing.

libraryDependencies ++= Seq(
"com.qubole" %% "spark-acid-shaded-dependencies" % "0.1"
)

How do we get it?


Thank you,
Naresh




Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Sat, Jul 27, 2019 at 5:34 PM naresh Goud 
wrote:

> Hi Abhishek,
>
>
> We are not able to build jar using git hub code with below error?
>
> Any others able to build jars? Is there anything else missing?
>
>
>
> Note: Unresolved dependencies path:
> [warn]  com.qubole:spark-acid-shaded-dependencies_2.11:0.1
> (C:\Data\Hadoop\spark-acid-master\build.sbt#L51-54)
> [warn]+- com.qubole:spark-acid_2.11:0.4.0
> sbt.ResolveException: unresolved dependency:
> com.qubole#spark-acid-shaded-dependencies_2.11;0.1: not found
> at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
> at
> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
> at
> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
> at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
> at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
> at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
> at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
> at
> xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
> at
> xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
> at xsbt.boot.Using$.withResource(Using.scala:10)
> at xsbt.boot.Using$.apply(Using.scala:9)
> at
> xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
> at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
> at xsbt.boot.Locks$.apply0(Locks.scala:31)
> at xsbt.boot.Locks$.apply(Locks.scala:28)
> at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
> at sbt.IvySbt.withIvy(Ivy.scala:128)
> at sbt.IvySbt.withIvy(Ivy.scala:125)
> at sbt.IvySbt$Module.withModule(Ivy.scala:156)
> at sbt.IvyActions$.updateEither(IvyActions.scala:168)
> at
> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1541)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>
> On Sat, Jul 27, 2019 at 3:25 PM Nicolas Paris 
> wrote:
>
>> Congrats
>>
>> The read/write feature with hive3 is highly interesting
>>
>> On Fri, Jul 26, 2019 at 06:07:55PM +0530, Abhishek Somani wrote:
>> > Hi All,
>> >
>> > We at Qubole have open sourced a datasource that will enable users to
>> work on
>> > their Hive ACID Transactional Tables using Spark.
>> >
>> > Github: https://github.com/qubole/spark-acid
>> >
>> > Hive ACID tables allow users to work on their data transactionally, and
>> also
>> > gives them the ability to Delete, Update and Merge data efficiently
>> without
>> > having to rewrite all of their data in a table, partition or file. We
>> believe
>> > that being able to work on these tables from Spark is a much desired
>> value add,
>> > as is also apparent in
>> https://issues.apache.org/jira/browse/SPARK-15348 and
>> > https://issues.apache.org/jira/browse/SPARK-16996 with multiple people
>> looking
>> > for it. Currently the datasource supports reading from these ACID
>> tables only,
>> > and we are working on adding the ability to write into these tables via
>> Spark
>> > as well.
>> >
>> > The datasource is also available as a spark package, and instructions
>> on how to
>> > use it are available on the Github page.
>> >
>> > We welcome your feedback and suggestions.
>> >
>> > Thanks,
>> > Abhishek Somani
>>
>> --
>> nicolas
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: New Spark Datasource for Hive ACID tables

2019-07-27 Thread naresh Goud
Hi Abhishek,


We are not able to build jar using git hub code with below error?

Any others able to build jars? Is there anything else missing?



Note: Unresolved dependencies path:
[warn]  com.qubole:spark-acid-shaded-dependencies_2.11:0.1
(C:\Data\Hadoop\spark-acid-master\build.sbt#L51-54)
[warn]+- com.qubole:spark-acid_2.11:0.4.0
sbt.ResolveException: unresolved dependency:
com.qubole#spark-acid-shaded-dependencies_2.11;0.1: not found
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
at
sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
at
sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
at
xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
at
xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
at xsbt.boot.Using$.withResource(Using.scala:10)
at xsbt.boot.Using$.apply(Using.scala:9)
at
xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
at xsbt.boot.Locks$.apply0(Locks.scala:31)
at xsbt.boot.Locks$.apply(Locks.scala:28)
at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
at sbt.IvySbt.withIvy(Ivy.scala:128)
at sbt.IvySbt.withIvy(Ivy.scala:125)
at sbt.IvySbt$Module.withModule(Ivy.scala:156)
at sbt.IvyActions$.updateEither(IvyActions.scala:168)
at
sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1541)














Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Sat, Jul 27, 2019 at 3:25 PM Nicolas Paris 
wrote:

> Congrats
>
> The read/write feature with hive3 is highly interesting
>
> On Fri, Jul 26, 2019 at 06:07:55PM +0530, Abhishek Somani wrote:
> > Hi All,
> >
> > We at Qubole have open sourced a datasource that will enable users to
> work on
> > their Hive ACID Transactional Tables using Spark.
> >
> > Github: https://github.com/qubole/spark-acid
> >
> > Hive ACID tables allow users to work on their data transactionally, and
> also
> > gives them the ability to Delete, Update and Merge data efficiently
> without
> > having to rewrite all of their data in a table, partition or file. We
> believe
> > that being able to work on these tables from Spark is a much desired
> value add,
> > as is also apparent in https://issues.apache.org/jira/browse/SPARK-15348
>  and
> > https://issues.apache.org/jira/browse/SPARK-16996 with multiple people
> looking
> > for it. Currently the datasource supports reading from these ACID tables
> only,
> > and we are working on adding the ability to write into these tables via
> Spark
> > as well.
> >
> > The datasource is also available as a spark package, and instructions on
> how to
> > use it are available on the Github page.
> >
> > We welcome your feedback and suggestions.
> >
> > Thanks,
> > Abhishek Somani
>
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: New Spark Datasource for Hive ACID tables

2019-07-26 Thread naresh Goud
Thanks Abhishek.

Will it work on hive acid table which is not compacted ? i.e table having
base and delta files?

Let’s say hive acid table customer

Create table customer(customer_id int, customer_name string, customer_email
string) cluster by customer_id buckets 10 location ‘/test/customer’
tableproperties(transactional=true)


And table hdfs path having below directories

/test/customer/base_15234/
/test/customer/delta_1234_456


That means table having updates and major compaction not run.

Will it spark reader works ?


Thank you,
Naresh







On Fri, Jul 26, 2019 at 7:38 AM Abhishek Somani 
wrote:

> Hi All,
>
> We at Qubole  have open sourced a datasource
> that will enable users to work on their Hive ACID Transactional Tables
> 
> using Spark.
>
> Github: https://github.com/qubole/spark-acid
>
> Hive ACID tables allow users to work on their data transactionally, and
> also gives them the ability to Delete, Update and Merge data efficiently
> without having to rewrite all of their data in a table, partition or file.
> We believe that being able to work on these tables from Spark is a much
> desired value add, as is also apparent in
> https://issues.apache.org/jira/browse/SPARK-15348 and
> https://issues.apache.org/jira/browse/SPARK-16996 with multiple people
> looking for it. Currently the datasource supports reading from these ACID
> tables only, and we are working on adding the ability to write into these
> tables via Spark as well.
>
> The datasource is also available as a spark package, and instructions on
> how to use it are available on the Github page
> .
>
> We welcome your feedback and suggestions.
>
> Thanks,
> Abhishek Somani
>
-- 
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: Spark SQL

2019-06-19 Thread naresh Goud
Just to make it more clear,  Spark sql uses hive metastore and run queries
using its own engine and not uses hive execution engine.

Please correct me if it’s not true.



On Mon, Jun 10, 2019 at 2:29 PM Russell Spitzer 
wrote:

> Spark can use the HiveMetastore as a catalog, but it doesn't use the hive
> parser or optimization engine. Instead it uses Catalyst, see
> https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
>
> On Mon, Jun 10, 2019 at 2:07 PM naresh Goud 
> wrote:
>
>> Hi Team,
>>
>> Is Spark Sql uses hive engine to run queries ?
>> My understanding that spark sql uses hive meta store to get metadata
>> information to run queries.
>>
>> Thank you,
>> Naresh
>> --
>> Thanks,
>> Naresh
>> www.linkedin.com/in/naresh-dulam
>> http://hadoopandspark.blogspot.com/
>>
>> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Override jars in spark submit

2019-06-19 Thread naresh Goud
Hello All,

How can we override jars in spark submit?
We have hive-exec-spark jar which is available as part of default spark
cluster jars.
We wanted to override above mentioned jar in spark submit with latest
version jar.
How do we do that ?


Thank you,
Naresh
-- 
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Spark SQL

2019-06-10 Thread naresh Goud
Hi Team,

Is Spark Sql uses hive engine to run queries ?
My understanding that spark sql uses hive meta store to get metadata
information to run queries.

Thank you,
Naresh
-- 
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: Subscribe Multiple Topics Structured Streaming

2018-09-17 Thread naresh Goud
You can have below statement for multiple topics

val dfStatus = spark.readStream.
  format("kafka").
  option("subscribe", "utility-status, utility-critical").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest")
  .load()





On Mon, Sep 17, 2018 at 3:28 AM sivaprakash 
wrote:

> Hi
>
> I have integrated Spark Streaming with Kafka in which Im listening 2 topics
>
> def main(args: Array[String]): Unit = {
>
> val schema = StructType(
>   List(
> StructField("gatewayId", StringType, true),
> StructField("userId", StringType, true)
>   )
> )
>
> val spark = SparkSession
>   .builder
>   .master("local[4]")
>   .appName("DeviceAutomation")
>   .getOrCreate()
>
> val dfStatus = spark.readStream.
>   format("kafka").
>   option("subscribe", "utility-status, utility-critical").
>   option("kafka.bootstrap.servers", "localhost:9092").
>   option("startingOffsets", "earliest")
>   .load()
>
>
>   }
>
> Since I have few more topics to be listed and perform different operations
> I
> would like to move each topics into separate case class for better clarity.
> Is it possible?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: java.nio.file.FileSystemException: /tmp/spark- .._cache : No space left on device

2018-08-19 Thread naresh Goud
Also check enough space available on /tmp directory

On Fri, Aug 17, 2018 at 10:14 AM Jeevan K. Srivatsa <
jeevansriva...@gmail.com> wrote:

> Hi Venkata,
>
> On a quick glance, it looks like a file-related issue more so than an
> executor issue. If the logs are not that important, I would clear
> /tmp/spark-events/ directory and assign a suitable permission (e.g., chmod
> 755) to that and rerun the application.
>
> chmod 755 /tmp/spark-events/
>
> Thanks and regards,
> Jeevan K. Srivatsa
>
>
> On Fri, 17 Aug 2018 at 15:20, Polisetti, Venkata Siva Rama Gopala Krishna <
> vpolise...@spglobal.com> wrote:
>
>> Hi
>>
>> Am getting below exception when I Run Spark-submit in linux machine , can
>> someone give quick solution with commands
>>
>> Driver stacktrace:
>>
>> - Job 0 failed: count at DailyGainersAndLosersPublisher.scala:145, took
>> 5.749450 s
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4
>> in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage
>> 0.0 (TID 6, 172.29.62.145, executor 0): java.nio.file.FileSystemException:
>> /tmp/spark-523d5331-3884-440c-ac0d-f46838c2029f/executor-390c9cd7-217e-42f3-97cb-fa2734405585/spark-206d92c0-f0d3-443c-97b2-39494e2c5fdd/-4230744641534510169119_cache
>> -> ./PublishGainersandLosers-1.0-SNAPSHOT-shaded-Gopal.jar: No space left
>> on device
>>
>> at
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
>>
>> at
>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>
>> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:253)
>>
>> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
>>
>> at
>> sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
>>
>> at java.nio.file.Files.copy(Files.java:1274)
>>
>> at
>> org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:625)
>>
>> at org.apache.spark.util.Utils$.copyFile(Utils.scala:596)
>>
>> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:473)
>>
>> at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:696)
>>
>> at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:688)
>>
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>>
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>>
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>>
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>>
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>>
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>>
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>>
>> at org.apache.spark.executor.Executor.org
>> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:688)
>>
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:308)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> --
>>
>> The information contained in this message is intended only for the
>> recipient, and may be a confidential attorney-client communication or may
>> otherwise be privileged and confidential and protected from disclosure. If
>> the reader of this message is not the intended recipient, or an employee or
>> agent responsible for delivering this message to the intended recipient,
>> please be aware that any dissemination or copying of this communication is
>> strictly prohibited. If you have received this communication in error,
>> please immediately notify us by replying to the message and deleting it
>> from your computer. S Global Inc. reserves the right, subject to
>> applicable local law, to monitor, review and process the content of any
>> electronic message or information sent to or from S Global Inc. e-mail
>> addresses without informing the sender or recipient of the message. By
>> sending electronic message or information to S Global Inc. e-mail
>> addresses you, as the sender, are consenting to S Global Inc. processing
>> any of your personal data therein.
>>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: Unable to alter partition. The transaction for alter partition did not commit successfully.

2018-05-30 Thread naresh Goud
What are you doing? Give more details o what are you doing

On Wed, May 30, 2018 at 12:58 PM Arun Hive 
wrote:

>
> Hi
>
> While running my spark job component i am getting the following exception.
> Requesting for your help on this:
> Spark core version -
> spark-core_2.10-2.1.1
>
> Spark streaming version -
> spark-streaming_2.10-2.1.1
>
> Spark hive version -
> spark-hive_2.10-2.1.1
>
>
> 2018-05-28 00:08:04,317  [streaming-job-executor-2] ERROR (Hive.java:1883)
> - org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter
> partition. The transaction for alter partition did not commit successfully.
> at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:573)
> at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:546)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.alterPartitionSpec(Hive.java:1915)
> at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1875)
> at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1407)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1593)
> at sun.reflect.GeneratedMethodAccessor123.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.sql.hive.client.Shim_v1_2.loadDynamicPartitions(HiveShim.scala:831)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:693)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:691)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:823)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:811)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:319)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
> at
> org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:263)
> at
> org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:243)
>
> 
> -
>  -
> 
> -
> 
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> at
> 

Re: ERROR: Hive on Spark

2018-04-16 Thread naresh Goud
Change you table name in query to spam.spamdataset instead of spamdataset.

On Sun, Apr 15, 2018 at 2:12 PM Rishikesh Gawade 
wrote:

> Hello there. I am a newbie in the world of Spark. I have been working on a
> Spark Project using Java.
> I have configured Hive and Spark to run on Hadoop.
> As of now i have created a Hive (derby) database on Hadoop HDFS at the
> given location(warehouse location): */user/hive/warehouse *and database
> name as : *spam *(saved as *spam.db* at the aforementioned location).
> I have been trying to read tables in this database in spark to create
> RDDs/DataFrames.
> Could anybody please guide me in how I can achieve this?
> I used the following statements in my Java Code:
>
> SparkSession spark = SparkSession
> .builder()
> .appName("Java Spark Hive Example").master("yarn")
> .config("spark.sql.warehouse.dir","/user/hive/warehouse")
> .enableHiveSupport()
> .getOrCreate();
> spark.sql("USE spam");
> spark.sql("SELECT * FROM spamdataset").show();
>
> After this i built the project using Maven as follows: mvn clean package
> -DskipTests and a JAR was generated.
>
> After this, I tried running the project via spark-submit CLI using :
>
> spark-submit --class com.adbms.SpamFilter --master yarn
> ~/IdeaProjects/mlproject/target/mlproject-1.0-SNAPSHOT.jar
>
> and got the following error:
>
> Exception in thread "main"
> org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database
> 'spam' not found;
> at org.apache.spark.sql.catalyst.catalog.SessionCatalog.org
> $apache$spark$sql$catalyst$catalog$SessionCatalog$$requireDbExists(SessionCatalog.scala:174)
> at
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.setCurrentDatabase(SessionCatalog.scala:256)
> at
> org.apache.spark.sql.execution.command.SetDatabaseCommand.run(databases.scala:59)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
> at org.apache.spark.sql.Dataset.(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
> at com.adbms.SpamFilter.main(SpamFilter.java:54)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Also, I replaced the SQL query with "SHOW DATABASES", and it showed only
> one database namely "default". Those stored on HDFS warehouse dir weren't
> shown.
>
> I request you to please check this and if anything is wrong then please
> suggest an ideal way to read Hive tables on Hadoop in Spark using Java. A
> link to a webpage having relevant info would also be appreciated.
> Thank you in anticipation.
> Regards,
> Rishikesh Gawade
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: [Spark sql]: Re-execution of same operation takes less time than 1st

2018-04-03 Thread naresh Goud
Whenever spark read the data from it will have it in executor memory until
and unless there is no room for new data read or processed. This is the
beauty of spark.


On Tue, Apr 3, 2018 at 12:42 AM snjv  wrote:

> Hi,
>
> When we execute the same operation twice, spark takes less time ( ~40%)
> than
> the first.
> Our operation is like this:
> Read 150M rows ( spread in multiple parquet files) into DF
> Read 10M rows ( spread in multiple parquet files) into other DF.
> Do an intersect operation.
>
> Size of 150M row file: 587MB
> size of 10M file: 50M
>
> If first execution takes around 20 sec the next one will take just 10-12
> sec.
> Any specific reason for this? Is any optimization is there that we can
> utilize during the first operation?
>
> Regards
> Sanjeev
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: How does extending an existing parquet with columns affect impala/spark performance?

2018-04-03 Thread naresh Goud
>From spark point of view it shouldn’t effect. it’s possible to extend
columns of new parquet files and it won’t affect Performance and not
required to change spark application code.



On Tue, Apr 3, 2018 at 9:14 AM Vitaliy Pisarev 
wrote:

> This is not strictly a spark question but I'll give it a shot:
>
> have an existing setup of parquet files that are being queried from impala
> and from spark.
>
> I intend to add some 30 relatively 'heavy' columns to the parquet. Each
> column would store an array of structs. Each struct can have from 5 to 20
> fields. An array may have a couple of thousands of structs.
>
> Theoretically, parquet being a columnar storage- extending it with columns
> should not affect performance of *existing* queries (since they are not
> touching these columns).
>
>- Is this premise correct?
>- What should I watch out for doing this move?
>- In general, what are the considerations when deciding on the "width"
>(i.e amount of columns) of a parquet file?
>
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread naresh Goud
In case of storing as parquet file I don’t think it requires header.
option("header","true")

Give a try by removing header option and then try to read it.  I haven’t
tried. Just a thought.

Thank you,
Naresh


On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:

> Hi,
>
>
> I am using pyspark. To transform my sample data and create model, I use
> stringIndexer and OneHotEncoder.
>
>
> However, when I try to write data as csv using below command
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")
>
>
> I get UnsupportedOperationException
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct data
> type.
>
> Therefore, to save data and avoid getting the error I use
>
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite").save("output")
>
>
> The above command saves data but it's in parquet format.
> How can I read parquet file and convert to csv to observe the data?
>
> When I use
>
> df = spark.read.parquet("1.parquet"), it throws:
>
> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
> outstanding blocks
>
> Your input is appreciated.
>
>
> Best regards,
>
> Mina
>
>
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: is there a way to catch exceptions on executor level

2018-03-10 Thread naresh Goud
How about accumaltors?


Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Thu, Mar 8, 2018 at 12:07 AM Chethan Bhawarlal <
cbhawar...@collectivei.com> wrote:

> Hi Dev,
>
> I am doing spark operations on Rdd level for each row like this,
>
>  private def obj(row: org.apache.spark.sql.Row): Put = {
>
>
>
> row.schema.fields.foreach(x => {
>
>   x.dataType match {
>
>case (StringType)=> //some operation
>
>
> so, when I get some empty or garbage value my code fails and I am not able
> to catch the exceptions as these failures are occurring at executors.
>
>
> is there a way I can catch these exceptions and accumulate them and print
> to driver logs?
>
>
> any sample examples provided will be of great help.
>
>
> Thanks,
>
> Chethan.
>
>
>
> Collective[i] dramatically improves sales and marketing performance using
> technology, applications and a revolutionary network designed to provide
> next generation analytics and decision-support directly to business users.
> Our goal is to maximize human potential and minimize mistakes. In most
> cases, the results are astounding. We cannot, however, stop emails from
> sometimes being sent to the wrong person. If you are not the intended
> recipient, please notify us by replying to this email's sender and deleting
> it (and any attachments) permanently from your system. If you are, please
> respect the confidentiality of this communication's contents.


Re: Reading kafka and save to parquet problem

2018-03-07 Thread naresh Goud
change it to readStream instead of read as below

val df = spark
  .readStream
.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()


Check is this helpful

https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala








Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen  wrote:

> I am struggling in trying to read data in kafka and save them to parquet
> file on hdfs by using spark streaming according to this post
> https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet
>
> My code is similar to  following
>
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>
>   .write.parquet("hdfs://data.parquet")
>
>
> What the difference is I am writing in Java language.
>
> But in practice, this code just run once and then exit gracefully.
> Although it produces the parquet file successfully and no any exception is
> threw out , it runs like a normal spark program rather than a spark
> streaming program.
>
> What should I do if want to read kafka and save them to parquet in batch?
>
> Regard,
> Junfeng Chen
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread naresh Goud
Hi Kant,

TD's explanation makes a lot sense. Refer this stackoverflow, where its was
explained with program output.  Hope this helps.

https://stackoverflow.com/questions/45579100/structured-streaming-watermark-vs-exactly-once-semantics




Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


On Tue, Feb 27, 2018 at 7:45 PM, Tathagata Das 
wrote:

> Let me answer the original question directly, that is, how do we determine
> that an event is late. We simply track the maximum event time the engine
> has seen in the data it has processed till now. And any data that has event
> time less than the max is basically "late" (as it is out-of-order). Now, in
> a distributed setting, it is very hard define to whether each record is
> late or not, because it is hard to have a consistent definition of
> max-event-time-seen. Fortunately, we dont have to do this precisely because
> we dont really care whether a record is "late"; we only care whether a
> record is "too late", that is, older than the watermark =
> max-event-time-seen - watermark-delay). As the programming guide says, if
> data is "late" but not "too late" we process it in the same way as non-late
> data. Only when the data is "too late" do we drop it.
>
> To further clarify, we do not in any way to correlate processing-time with
> event-time. The definition of lateness is only based on event-time and has
> nothing to do with processing-time. This allows us to do event-time
> processing with old data streams as well. For example, you may replay
> 1-week old data as a stream, and the processing will be exactly the same as
> it would have been if you had processed the stream in real-time a week ago.
> This is fundamentally necessary for achieving the deterministic processing
> that Structured Streaming guarantees.
>
> Regarding the picture, the "time" is actually "event-time". My apologies
> for not making this clear in the picture. In hindsight, the picture can be
> made much better.  :)
>
> Hope this explanation helps!
>
> TD
>
> On Tue, Feb 27, 2018 at 2:26 AM, kant kodali  wrote:
>
>> I read through the spark structured streaming documentation and I wonder
>> how does spark structured streaming determine an event has arrived late?
>> Does it compare the event-time with the processing time?
>>
>> [image: enter image description here]
>> 
>>
>> Taking the above picture as an example Is the bold right arrow line
>> "Time" represent processing time? If so
>>
>> 1) where does this processing time come from? since its streaming Is it
>> assuming someone is likely using an upstream source that has processing
>> timestamp in it or spark adds a processing timestamp field? For example,
>> when reading messages from Kafka we do something like
>>
>> Dataset kafkadf = spark.readStream().forma("kafka").load()
>>
>> This dataframe has timestamp column by default which I am assuming is the
>> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>>
>> 2) I can see there is a time comparison between bold right arrow line and
>> time in the message. And is that how spark determines an event is late?
>>
>
>


Re: Out of memory Error when using Collection Accumulator Spark 2.2

2018-02-26 Thread naresh Goud
what is your driver memory?

Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


On Mon, Feb 26, 2018 at 3:45 AM, Patrick  wrote:

> Hi,
>
> We were getting OOM error when we are accumulating the results of each
> worker. We were trying to avoid collecting data to driver node instead used
> accumulator as per below code snippet,
>
> Is there any spark config to set the accumulator settings Or am i doing
> the wrong way to collect the huge data set?
>
>   CollectionAccumulator accumulate;
>   Dataset bin;
>
> bin.foreach((ForeachFunction) row -> {
>   accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2));
> });
>
> accumulate.value().forEach(element -> {
>   String[] arr = element.split("\\|");
>   String count = arr[2];
>   double percentage =
>   (total == 0.0) ? 0.0 : (Double.valueOf(count) / total);
>   PayloadBin payload = new PayloadBin(arr[0],
>   arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder);
>   binArray.add(payload);
>
> });
>
>
> 18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in
> memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB)
>
> 18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034
> on rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB)
>
> 18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in
> stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor
> 92) (14/200)
>
> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
> Java heap space
>
> at java.util.Arrays.copyOf(Arrays.java:3181)
>
> at java.util.ArrayList.toArray(ArrayList.java:376)
>
> at java.util.Collections$SynchronizedCollection.
> toArray(Collections.java:2024)
>
> at java.util.ArrayList.(ArrayList.java:177)
>
> at org.apache.spark.util.CollectionAccumulator.value(
> AccumulatorV2.scala:470)
>
>


Re: partitionBy with partitioned column in output?

2018-02-26 Thread naresh Goud
is this helps?

sc.parallelize(List((1,10),(2,
20))).toDF("foo","bar").map(("foo","bar")=>("foo",("foo","bar"))).
partitionBy("foo").json("json-out")


On Mon, Feb 26, 2018 at 4:28 PM, Alex Nastetsky 
wrote:

> Is there a way to make outputs created with "partitionBy" to contain the
> partitioned column? When reading the output with Spark or Hive or similar,
> it's less of an issue because those tools know how to perform partition
> discovery. But if I were to load the output into an external data warehouse
> or database, it would have no idea.
>
> Example below -- a dataframe with two columns "foo" and "bar" is
> partitioned by "foo", but the data only contains "bar", since it expects
> the reader to know how to derive the value of "foo" from the parent
> directory. Note that it's the same thing with Parquet and Avro as well, I
> just chose to use JSON in my example.
>
> scala> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.
> partitionBy("foo").json("json-out")
>
>
> $ ls json-out/
> foo=1  foo=2  _SUCCESS
> $ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
> {"bar":10}
> $ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
> {"bar":20}
>
> Thanks,
> Alex.
>


Re: Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-26 Thread naresh Goud
Thanks, I'll check it out.

On Mon, Feb 26, 2018 at 12:11 AM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> The continuous one is our new low latency continuous processing engine in
> Structured Streaming (to be released in 2.3).
> Here is the pre-release doc -
> https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs/_site/structured-streaming-programming-guide.html#continuous-processing
>
> On Sun, Feb 25, 2018 at 12:26 PM, naresh Goud <nareshgoud.du...@gmail.com>
> wrote:
>
>> Hello Spark Experts,
>>
>> What is the difference between Trigger.Continuous(10.seconds) and
>> Trigger.ProcessingTime("10 seconds") ?
>>
>>
>>
>> Thank you,
>> Naresh
>>
>
>


Re: Spark structured streaming: periodically refresh static data frame

2018-02-25 Thread naresh Goud
Appu,

I am also landed in same problem.

Are you able to solve this issue? Could you please share snippet of code if
your able to do?

Thanks,
Naresh

On Wed, Feb 14, 2018 at 8:04 PM, Tathagata Das 
wrote:

> 1. Just loop like this.
>
>
> def startQuery(): Streaming Query = {
>// Define the dataframes and start the query
> }
>
> // call this on main thread
> while (notShutdown) {
>val query = startQuery()
>query.awaitTermination(refreshIntervalMs)
>query.stop()
>// refresh static data
> }
>
>
> 2. Yes, stream-stream joins in 2.3.0, soon to be released. RC3 is
> available if you want to test it right now - https://dist.apache.org/
> repos/dist/dev/spark/v2.3.0-rc3-bin/.
>
>
>
> On Wed, Feb 14, 2018 at 3:34 AM, Appu K  wrote:
>
>> TD,
>>
>> Thanks a lot for the quick reply :)
>>
>>
>> Did I understand it right that in the main thread, to wait for the
>> termination of the context I'll not be able to use
>>  outStream.awaitTermination()  -  [ since i'll be closing in inside another
>> thread ]
>>
>> What would be a good approach to keep the main app long running if I’ve
>> to restart queries?
>>
>> Should i just wait for 2.3 where i'll be able to join two structured
>> streams ( if the release is just a few weeks away )
>>
>> Appreciate all the help!
>>
>> thanks
>> App
>>
>>
>>
>> On 14 February 2018 at 4:41:52 PM, Tathagata Das (
>> tathagata.das1...@gmail.com) wrote:
>>
>> Let me fix my mistake :)
>> What I suggested in that earlier thread does not work. The streaming
>> query that joins a streaming dataset with a batch view, does not correctly
>> pick up when the view is updated. It works only when you restart the query.
>> That is,
>> - stop the query
>> - recreate the dataframes,
>> - start the query on the new dataframe using the same checkpoint location
>> as the previous query
>>
>> Note that you dont need to restart the whole process/cluster/application,
>> just restart the query in the same process/cluster/application. This should
>> be very fast (within a few seconds). So, unless you have latency SLAs of 1
>> second, you can periodically restart the query without restarting the
>> process.
>>
>> Apologies for my misdirections in that earlier thread. Hope this helps.
>>
>> TD
>>
>> On Wed, Feb 14, 2018 at 2:57 AM, Appu K  wrote:
>>
>>> More specifically,
>>>
>>> Quoting TD from the previous thread
>>> "Any streaming query that joins a streaming dataframe with the view will
>>> automatically start using the most updated data as soon as the view is
>>> updated”
>>>
>>> Wondering if I’m doing something wrong in  https://gist.github.com/anony
>>> mous/90dac8efadca3a69571e619943ddb2f6
>>>
>>> My streaming dataframe is not using the updated data, even though the
>>> view is updated!
>>>
>>> Thank you
>>>
>>>
>>> On 14 February 2018 at 2:54:48 PM, Appu K (kut...@gmail.com) wrote:
>>>
>>> Hi,
>>>
>>> I had followed the instructions from the thread https://mail-archives.a
>>> pache.org/mod_mbox/spark-user/201704.mbox/%3CD1315D33-41CD-4
>>> ba3-8b77-0879f3669...@qvantel.com%3E while trying to reload a static
>>> data frame periodically that gets joined to a structured streaming query.
>>>
>>> However, the streaming query results does not reflect the data from the
>>> refreshed static data frame.
>>>
>>> Code is here https://gist.github.com/anonymous/90dac8efadca3a69571e6
>>> 19943ddb2f6
>>>
>>> I’m using spark 2.2.1 . Any pointers would be highly helpful
>>>
>>> Thanks a lot
>>>
>>> Appu
>>>
>>>
>>
>


Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-25 Thread naresh Goud
Hello Spark Experts,

What is the difference between Trigger.Continuous(10.seconds) and
Trigger.ProcessingTime("10 seconds") ?



Thank you,
Naresh


Re: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread naresh Goud
Here is my understanding, hope this gives some idea to understand how it
works. It might be wrong also, please excuse if it’s . I am trying to
derivating execution model with my understanding. Sorry it’s long email.

driver will keep polling Kafka for latest offset of each topic and then it
schedule jobs with offsets pulled from topic meta data.

Here job(processing logic1+  Logic2 + logic3).
These logics will be executed sequential only as defined in your
application code in executor.

Whenever job get started it will be started in one transaction which
includes following activities
Transaction
{
Get data from Kafka.
Execute logic1 -> logic2 -> logic3
Update processed record offset information
}

Having said that, coming to your approach mentioned for parallel processing
 If you pass three topics to single create Dstream spark will poll once to
get offsets of all topics instead of three poll if you create with with
different createDStream.

With the above mentioned approach of execution job is scheduled as below.
Job{
   Logic1 with offsets
Logic2 with its topic offsets
   Logic 3 with its offsets
}

With this approach also it executing logics in sequential.

Lets come to your last point of differentiate data by somehow and I am
assuming your application logic as below and  schedules job would look like
this

Job{
If(topic1 record){execute logic1)
If(topic2 record ) {execute logic2}
If(topic3 record) {execute logic3}
}

This is also leads to sequential execution.


distributed system are not designed  to execute parts of  job in parallel,
instead it will execute whole job across partitions of data in parallel.

To summarize it will be possible to parallelism is possible within each
topic processing not across processing different topics. Assume if you have
partition for a topic 3, then there would be 3 executors run parallel
executing job.





On Thu, Feb 22, 2018 at 9:44 PM Vibhakar, Beejal <
beejal.vibha...@fisglobal.com> wrote:

> Naresh – Thanks for taking out time to respond.
>
>
>
> So is it right to say that it’s the Driver program which at every 30
> seconds tells the executors (Which manage the Streams) to run rather than
> each executor making that decision themselves? And this really makes it
> sequential execution in my case?
>
>
>
> BTW, do you think following would be more suitable way to run this in
> parallel?
>
>
>
>- Right now I am creating 3 DataStream, one for each entity using
>KafkaUtils.createDirectStream API
>- While creating each DataStream, I pass on a single Kafka topic
>- Instead of creating 3 DataStream if I create a single DataStream and
>pass on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be
>able to parallelize the processing (We just need to allocate right number
>of executors)
>- To have separate processing logic for each entity, I just need some
>way to differentiate records of one type of entity from other type of
>entities.
>
>
>
> -Beejal
>
>
>
> *From:* naresh Goud [mailto:nareshgoud.du...@gmail.com]
> *Sent:* Friday, February 23, 2018 8:56 AM
> *To:* Vibhakar, Beejal <beejal.vibha...@fisglobal.com>
> *Subject:* Re: Consuming Data in Parallel using Spark Streaming
>
>
>
> You will have the same behavior both in local and hadoop cluster.
>
> since there will be only one stream context in driver which runs in Single
> JVM).
>
>
>
> On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal <
> beejal.vibha...@fisglobal.com> wrote:
>
> I am trying to process data from 3 different Kafka topics using 3
> InputDStream with a single StreamingContext. I am currently testing this
> under Sandbox where I see data processed from one Kafka topic followed by
> other.
>
>
>
> *Question#1:* I want to understand that when I run this program in Hadoop
> cluster, will it process the data in parallel from 3 Kafka topics OR will I
> see the same behavior as I see in my Sandbox?
>
>
>
> *Question#2:* I aim to process the data from all three Kafka topics in
> parallel.  Can I achieve this without breaking this program into 3 separate
> smaller programs?
>
>
>
> Here’s how the code template looks like..
>
>
>
>*val* ssc = *new* StreamingContext(sc, 30)
>
>
>
> *val topic1 = Array(“TOPIC1”)*
>
>
>
>*val* dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte],
> GenericRecord](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[Array[Byte], GenericRecord](*topic1*, kafkaParms))
>
>
>
>  // Processing logic for dataStreamTopic1
>
>
>
>
>
> *val topic2 = Array(“TOPIC2”)*
>
>
>
>*val* dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte],
&g

Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread naresh Goud
Got it. I understood issue in different way.



On Thu, Feb 22, 2018 at 9:19 PM Keith Chapman <keithgchap...@gmail.com>
wrote:

> My issue is that there is not enough pressure on GC, hence GC is not
> kicking in fast enough to delete the shuffle files of previous iterations.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com>
> wrote:
>
>> It would be very difficult to tell without knowing what is your
>> application code doing, what kind of transformation/actions performing.
>> From my previous experience tuning application code which avoids
>> unnecessary objects reduce pressure on GC.
>>
>>
>> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm benchmarking a spark application by running it for multiple
>>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>>> machine with a very large hear (~200GB). The system has a SSD. When running
>>> for 3 to 4 iterations I get into a situation that I run out of disk space
>>> on the /tmp directory. On further investigation I was able to figure out
>>> that the reason for this is that the shuffle files are still around,
>>> because I have a very large hear GC has not happen and hence the shuffle
>>> files are not deleted. I was able to confirm this by lowering the heap size
>>> and I see GC kicking in more often and the size of /tmp stays under
>>> control. Is there any way I could configure spark to handle this issue?
>>>
>>> One option that I have is to have GC run more often by
>>> setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
>>> cleaner solution?
>>>
>>> Regards,
>>> Keith.
>>>
>>> http://keith-chapman.com
>>>
>>
>>
>


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread naresh Goud
It would be very difficult to tell without knowing what is your application
code doing, what kind of transformation/actions performing. From my
previous experience tuning application code which avoids unnecessary
objects reduce pressure on GC.


On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman 
wrote:

> Hi,
>
> I'm benchmarking a spark application by running it for multiple
> iterations, its a benchmark thats heavy on shuffle and I run it on a local
> machine with a very large hear (~200GB). The system has a SSD. When running
> for 3 to 4 iterations I get into a situation that I run out of disk space
> on the /tmp directory. On further investigation I was able to figure out
> that the reason for this is that the shuffle files are still around,
> because I have a very large hear GC has not happen and hence the shuffle
> files are not deleted. I was able to confirm this by lowering the heap size
> and I see GC kicking in more often and the size of /tmp stays under
> control. Is there any way I could configure spark to handle this issue?
>
> One option that I have is to have GC run more often by
> setting spark.cleaner.periodicGC.interval to a much lower value. Is there
> a cleaner solution?
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


Re: Return statements aren't allowed in Spark closures

2018-02-22 Thread naresh Goud
Even i am not able to reproduce error

On Thu, Feb 22, 2018 at 2:51 AM, Michael Artz 
wrote:

> I am not able to reproduce your error. You should do something before you
> do that last function and maybe get some more help from the exception it
> returns. Like just add a csv.show (1) on the line before.  Also, can you
> post the different exception when you took out the "return" value like when
> Bryan suggested?
>
> It's getting to this bit of code
>
> private[spark] class ReturnStatementInClosureException
>   extends SparkException("Return statements aren't allowed in Spark closures")
>
> private class ReturnStatementFinder extends ClassVisitor(ASM5) {
>   override def visitMethod(access: Int, name: String, desc: String,
>   sig: String, exceptions: Array[String]): MethodVisitor = {
> if (name.contains("apply")) {
>   new MethodVisitor(ASM5) {
> override def visitTypeInsn(op: Int, tp: String) {
>   if (op == NEW && 
> tp.contains("scala/runtime/NonLocalReturnControl")) {
> throw new ReturnStatementInClosureException
>   }
> }
>   }
> } else {
>   new MethodVisitor(ASM5) {}
> }
>   }
> }
>
> and it must see the NonLocalReturnControl exception. My first guess is
> that the "queryYahoo" function is doing something that is causing an
> exception, but for some reason (Networking thing maybe?) it works ok in
> spark-shell.
>
> On Feb 21, 2018 10:47 PM, "Lian Jiang"  wrote:
>
>> Sorry Bryan. Unfortunately, this is not the root cause.
>>
>> Any other ideas? This is blocking my scenario. Thanks.
>>
>> On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Lian,
>>>
>>> You're writing Scala. Just remove the 'return'. No need for it in Scala.
>>>
>>> Get Outlook for Android 
>>>
>>> --
>>> *From:* Lian Jiang 
>>> *Sent:* Wednesday, February 21, 2018 4:16:08 PM
>>> *To:* user
>>> *Subject:* Return statements aren't allowed in Spark closures
>>>
>>> I can run below code in spark-shell using yarn client mode.
>>>
>>> val csv = spark.read.option("header", "true").csv("my.csv")
>>>
>>> def queryYahoo(row: Row) : Int = { return 10; }
>>>
>>> csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
>>> queryYahoo(r) })}
>>>
>>> However, the same code failed when run using spark-submit in yarn client
>>> or cluster mode due to error:
>>>
>>> 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
>>> org.apache.spark.util.ReturnStatementInClosureException: Return
>>> statements aren't allowed in Spark closures
>>>
>>> org.apache.spark.util.ReturnStatementInClosureException: Return
>>> statements aren't allowed in Spark closures
>>>
>>> at org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTyp
>>> eInsn(ClosureCleaner.scala:371)
>>>
>>> at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
>>>
>>> at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
>>>
>>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>>
>>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>>> ClosureCleaner$$clean(ClosureCleaner.scala:243)
>>>
>>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)
>>>
>>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)
>>>
>>> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach
>>> $1.apply(TraversableLike.scala:733)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>>
>>> at scala.collection.TraversableLike$WithFilter.foreach(Traversa
>>> bleLike.scala:732)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>>> ClosureCleaner$$clean(ClosureCleaner.scala:292)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
>>>
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
>>>
>>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>>> DD.scala:925)
>>>
>>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>>> DD.scala:924)
>>>
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:151)
>>>
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:112)
>>>
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>
>>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>
>>>
>>> Any idea? Thanks.
>>>
>>
>>


Re: KafkaUtils.createStream(..) is removed for API

2018-02-18 Thread naresh Goud
Thanks Ted.

I see  createDirectStream is experimental as annotated with
"org.apache.spark.annotation.Experimental".

Is it possible to be this API will be removed in future?  because we wanted
to use this API in one of our production jobs. afraid if it will not be
supported in future.

Thank you,
Naresh




On Sun, Feb 18, 2018 at 7:47 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> createStream() is still in external/kafka-0-8/src/main
> /scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
> But it is not in external/kafka-0-10/src/main/scala/org/apache/spark/strea
> ming/kafka010/KafkaUtils.scala
>
> FYI
>
> On Sun, Feb 18, 2018 at 5:17 PM, naresh Goud <nareshgoud.du...@gmail.com>
> wrote:
>
>> Hello Team,
>>
>> I see "KafkaUtils.createStream() " method not available in spark 2.2.1.
>>
>> Can someone please confirm if these methods are removed?
>>
>> below is my pom.xml entries.
>>
>>
>> 
>>   2.11.8
>>   2.11
>> 
>>
>>
>>   
>>   org.apache.spark
>>   spark-streaming_${scala.tools.version}
>>   2.2.1
>>   provided
>>   
>> 
>>   org.apache.spark
>>   spark-streaming-kafka-0-10_2.11
>>   2.2.1
>>   provided
>> 
>> 
>>   org.apache.spark
>>   spark-core_2.11
>>   2.2.1
>>   provided
>> 
>>   
>>
>>
>>
>>
>>
>> Thank you,
>> Naresh
>>
>
>


KafkaUtils.createStream(..) is removed for API

2018-02-18 Thread naresh Goud
Hello Team,

I see "KafkaUtils.createStream() " method not available in spark 2.2.1.

Can someone please confirm if these methods are removed?

below is my pom.xml entries.



  2.11.8
  2.11



  
  org.apache.spark
  spark-streaming_${scala.tools.version}
  2.2.1
  provided
  

  org.apache.spark
  spark-streaming-kafka-0-10_2.11
  2.2.1
  provided


  org.apache.spark
  spark-core_2.11
  2.2.1
  provided

  





Thank you,
Naresh


Re: Issue with Cast in Spark Sql

2018-01-30 Thread naresh Goud
Spark/Hive converting decimal to null value if we specify the precision
more than available precision in file.  Below example give you details. I
am not sure why its converting into Null.
Note: You need to trim string before casting to decimal

Table data with col1 and col2 columns


 val r = sqlContext.sql("select col2  from nd2629.test")
+-+
| col2|
+-+
| 1.00|
|  2.0|
|  123.798|
| 123456.6|
+-+



val r = sqlContext.sql("select CAST(TRIM(col2) as decimal(10,4)) from
nd2629.test")

+---+
|_c0|
+---+
| 1.|
| 2.|
|   123.7980|
|123456.6778|
+---+



 val r = sqlContext.sql("select CAST(TRIM(col2) as decimal(10,5)) from
nd2629.test")
+-+
|  _c0|
+-+
|  1.0|
|  2.0|
|123.79800|
| null|
+-+


you need to specify the precision value as max precision value for column -1

in above case max precision is 5 (123456.*6*) so we should specify
decimal(10,5)


Thank you,
Naresh




On Tue, Jan 30, 2018 at 8:48 PM, Arnav kumar  wrote:

> Hi Experts
>
> I am trying to convert a string with decimal value to decimal in Spark Sql
> and load it into Hive/Sql Server.
>
> In Hive instead of getting converted to decimal all my values are coming
> as null.
>
> In Sql Server instead of getting decimal values are coming without
> precision
>
> Can you please let me know if this is any kind of limitation
>
> Here is my code
>
>
> //select the required columns from actual data frame
> val query ="""select eventId,
> cast(eventData.latitude as Decimal(10,10)) as Latitude,
> cast(eventData.longitude as Decimal(10,10)) as Longitude from event"""
>
> //creating event data frame
> val eventTableDF = sparkSession.sql(query)
> //printing the schema for debugging purpose
> eventTableDF.printSchema()
>
> root
>  |-- eventId: string (nullable = true)
>  |-- Latitude: decimal(10,10) (nullable = true)
>  |-- Longitude: decimal(10,10) (nullable = true)
>
>
>
>  val eventTableDF = sparkSession.sql(query)
>   import sparkSession.implicits._
>   eventTableDF.write.mode(org.apache.spark.sql.SaveMode.
> Append).insertInto(eventTable)
>
>
>
>
>
> With Best Regards
> Arnav Kumar
>
>
>
>


Re: How to hold some data in memory while processing rows in a DataFrame?

2018-01-22 Thread naresh Goud
If I understand your requirement correct.
Use broadcast variables to replicate across all nodes the small amount of
data you wanted to reuse.



On Mon, Jan 22, 2018 at 9:24 PM David Rosenstrauch 
wrote:

> This seems like an easy thing to do, but I've been banging my head against
> the wall for hours trying to get it to work.
>
> I'm processing a spark dataframe (in python).  What I want to do is, as
> I'm processing it I want to hold some data from one record in some local
> variables in memory, and then use those values later while I'm processing a
> subsequent record.  But I can't see any way to do this.
>
> I tried using:
>
> dataframe.select(a_custom_udf_function('some_column'))
>
> ... and then reading/writing to local variables in the udf function, but I
> can't get this to work properly.
>
> My next guess would be to use dataframe.foreach(a_custom_function) and try
> to save data to local variables in there, but I have a suspicion that may
> not work either.
>
>
> What's the correct way to do something like this in Spark?  In Hadoop I
> would just go ahead and declare local variables, and read and write to them
> in my map function as I like.  (Although with the knowledge that a) the
> same map function would get repeatedly called for records with many
> different keys, and b) there would be many different instances of my code
> spread across many machines, and so each map function running on an
> instance would only see a subset of the records.)  But in Spark it seems to
> be extraordinarily difficult to create local variables that can be read
> from / written to across different records in the dataframe.
>
> Perhaps there's something obvious I'm missing here?  If so, any help would
> be greatly appreciated!
>
> Thanks,
>
> DR
>
>