Fixed byte array issue

2023-11-01 Thread KhajaAsmath Mohammed
Hi,

I am facing an issue with fixed byte array issue when reading spark
dataframe. spark.sql.parquet.enableVectorizedReader = false is solving my
issue but it is causing significant performance issue. any resolution for
this?

Thanks,
Asmath


Maximum executors in EC2 Machine

2023-10-23 Thread KhajaAsmath Mohammed
Hi,

I am running a spark job in spark EC2 machine whiich has 40 cores. Driver
and executor memory is 16 GB. I am using local[*] but I still get only one
executor(driver). Is there a way to get more executors with this config.

I am not using yarn or mesos in this case. Only one machine which is enough
for our work load but the data is increased.

Thanks,
Asmath


Re: Spark Structured Streaming -- Cannot consume next messages

2022-07-21 Thread KhajaAsmath Mohammed
I was able to figure it out . Hdfs directory where the data is being pushed was 
run previously with different user. Not having proper permissions resulted in 
this issue 

Thanks,
Asmath

> On Jul 21, 2022, at 4:22 PM, Artemis User  wrote:
> 
> Not sure what you mean by offerts/offsets.  I assume you were using 
> file-based instead of Kafka-based of data sources.  Are the incoming data 
> generated in mini-batch files or in a single large file?  Have you had this 
> type of problem before?
> 
>> On 7/21/22 1:02 PM, KhajaAsmath Mohammed wrote:
>> Hi,
>> 
>> I am seeing weird behavior in our spark structured streaming application 
>> where the offerts are not getting picked by the streaming  job.
>> 
>> If I delete the checkpoint directory and run the job again, I can see the 
>> data for the first batch but it is not picking up new offsets again from the 
>> next job when the job is running.
>> 
>> FYI, job is still running but it is not picking up new offsets. I am not 
>> able to figure out where the issue is in this case.
>> 
>> Thanks,
>> Asmath
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Structured Streaming -- Cannot consume next messages

2022-07-21 Thread KhajaAsmath Mohammed
Hi,

I am seeing weird behavior in our spark structured streaming application
where the offerts are not getting picked by the streaming  job.

If I delete the checkpoint directory and run the job again, I can see the
data for the first batch but it is not picking up new offsets again from
the next job when the job is running.

FYI, job is still running but it is not picking up new offsets. I am not
able to figure out where the issue is in this case.

Thanks,
Asmath


Spark streaming / confluent Kafka- messages are empty

2022-06-09 Thread KhajaAsmath Mohammed


Hi,

I am trying to read data from confluent Kafka using  avro schema registry. 
Messages are always empty and stream always shows empty records. Any suggestion 
on this please ??

Thanks,
Asmath
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



REGEX Spark - Dataframe

2021-06-26 Thread KhajaAsmath Mohammed
Hi,

What is the equivalent function using dataframe in spark. I was able to
make it work for spark sql but looking to use dataframes instead.

df11=self.spark.sql("""SELECT  transaction_card_bin,(CASE WHEN
transaction_card_bin  REGEXP '^5[1-5][\d]*' THEN "MC"
WHEN transaction_card_bin  REGEXP '^4[\d]*' THEN "VISA"
WHEN transaction_card_bin  REGEXP '^3[47][\d]*' THEN "AMEX"
WHEN transaction_card_bin  REGEXP
'^(6011|622(12[6-9]|1[3-9][0-9]|[2-8][0-9][0-9]|9[0-1][0-9]|92[0-5])|64[4-9]|65)[\d]*'
THEN "DISC"
ELSE "OTHER" END ) AS cardtype FROM  test12  """)


Thanks,

Asmath


S3 Access Issues - Spark

2021-05-18 Thread KhajaAsmath Mohammed
Hi,

I have written a sample spark job that reads the data residing in hbase. I
keep getting below error , any suggestions to resolve this please?


Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret
Access Key must be specified by setting the fs.s3.awsAccessKeyId and
fs.s3.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:74)


  conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
  conf.set("fs.s3.awsAccessKeyId", "ddd")
  conf.set("fs.s3.awsSecretAccessKey", "dd")

  conf.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem")
  conf.set("fs.s3n.awsAccessKeyId", "xxx")
  conf.set("fs.s3n.awsSecretAccessKey", "")

 I tried this setting in spark config and hbase config but none of the
resolved my issue.

Thanks,
Asmath


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-15 Thread KhajaAsmath Mohammed
Thanks everyone. I was able to resolve this. 

Here is what I did. Just passed conf file using —files option.

Mistake that I did was reading the json conf file before creating spark session 
. Reading if after creating spark session helped it. Thanks once again for your 
valuable suggestions 

Thanks,
Asmath

> On May 15, 2021, at 8:12 AM, Sean Owen  wrote:
> 
> 
> If code running on the executors need some local file like a config file, 
> then it does have to be passed this way. That much is normal.
> 
>> On Sat, May 15, 2021 at 1:41 AM Gourav Sengupta  
>> wrote:
>> Hi,
>> 
>> once again lets start with the requirement. Why are you trying to pass xml 
>> and json files to SPARK instead of reading them in SPARK? 
>> Generally when people pass on files they are python or jar files.
>> 
>> Regards,
>> Gourav


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-14 Thread KhajaAsmath Mohammed
Here is my updated spark submit without any luck.,

spark-submit --master yarn --deploy-mode cluster --files
/appl/common/ftp/conf.json,/etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
--num-executors 6 --executor-cores 3 --driver-cores 3 --driver-memory 7g
--executor-memory 7g /appl/common/ftp/ftp_event_data.py
/appl/common/ftp/conf.json 2021-05-10 7

On Fri, May 14, 2021 at 6:19 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Sorry my bad, it did not resolve the issue. I still have the same issue.
> can anyone please guide me. I was still running as a client instead of a
> cluster.
>
> On Fri, May 14, 2021 at 5:05 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> You are right. It worked but I still don't understand why I need to pass
>> that to all executors.
>>
>> On Fri, May 14, 2021 at 5:03 PM KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> I am using json only to read properties before calling spark session. I
>>> don't know why we need to pass that to all executors.
>>>
>>>
>>> On Fri, May 14, 2021 at 5:01 PM Longjiang.Yang <
>>> longjiang.y...@target.com> wrote:
>>>
>>>> Could you check whether this file is accessible in executors? (is it in
>>>> HDFS or in the client local FS)
>>>> /appl/common/ftp/conf.json
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From: *KhajaAsmath Mohammed 
>>>> *Date: *Friday, May 14, 2021 at 4:50 PM
>>>> *To: *"user @spark" 
>>>> *Subject: *[EXTERNAL] Urgent Help - Py Spark submit error
>>>>
>>>>
>>>>
>>>> /appl/common/ftp/conf.json
>>>>
>>>


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-14 Thread KhajaAsmath Mohammed
Sorry my bad, it did not resolve the issue. I still have the same issue.
can anyone please guide me. I was still running as a client instead of a
cluster.

On Fri, May 14, 2021 at 5:05 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> You are right. It worked but I still don't understand why I need to pass
> that to all executors.
>
> On Fri, May 14, 2021 at 5:03 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> I am using json only to read properties before calling spark session. I
>> don't know why we need to pass that to all executors.
>>
>>
>> On Fri, May 14, 2021 at 5:01 PM Longjiang.Yang 
>> wrote:
>>
>>> Could you check whether this file is accessible in executors? (is it in
>>> HDFS or in the client local FS)
>>> /appl/common/ftp/conf.json
>>>
>>>
>>>
>>>
>>>
>>> *From: *KhajaAsmath Mohammed 
>>> *Date: *Friday, May 14, 2021 at 4:50 PM
>>> *To: *"user @spark" 
>>> *Subject: *[EXTERNAL] Urgent Help - Py Spark submit error
>>>
>>>
>>>
>>> /appl/common/ftp/conf.json
>>>
>>


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-14 Thread KhajaAsmath Mohammed
You are right. It worked but I still don't understand why I need to pass
that to all executors.

On Fri, May 14, 2021 at 5:03 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> I am using json only to read properties before calling spark session. I
> don't know why we need to pass that to all executors.
>
>
> On Fri, May 14, 2021 at 5:01 PM Longjiang.Yang 
> wrote:
>
>> Could you check whether this file is accessible in executors? (is it in
>> HDFS or in the client local FS)
>> /appl/common/ftp/conf.json
>>
>>
>>
>>
>>
>> *From: *KhajaAsmath Mohammed 
>> *Date: *Friday, May 14, 2021 at 4:50 PM
>> *To: *"user @spark" 
>> *Subject: *[EXTERNAL] Urgent Help - Py Spark submit error
>>
>>
>>
>> /appl/common/ftp/conf.json
>>
>


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-14 Thread KhajaAsmath Mohammed
I am using json only to read properties before calling spark session. I
don't know why we need to pass that to all executors.


On Fri, May 14, 2021 at 5:01 PM Longjiang.Yang 
wrote:

> Could you check whether this file is accessible in executors? (is it in
> HDFS or in the client local FS)
> /appl/common/ftp/conf.json
>
>
>
>
>
> *From: *KhajaAsmath Mohammed 
> *Date: *Friday, May 14, 2021 at 4:50 PM
> *To: *"user @spark" 
> *Subject: *[EXTERNAL] Urgent Help - Py Spark submit error
>
>
>
> /appl/common/ftp/conf.json
>


Urgent Help - Py Spark submit error

2021-05-14 Thread KhajaAsmath Mohammed
Hi,

I am having a weird situation where the below command works when the
deploy mode is a client and fails if it is a cluster.

spark-submit --master yarn --deploy-mode client --files
/etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
--driver-memory 70g --num-executors 6 --executor-cores 3 --driver-cores 3
--driver-memory 7g --py-files /appl/common/ftp/ftp_event_data.py
 /appl/common/ftp/ftp_event_data.py /appl/common/ftp/conf.json 2021-05-10 7



21/05/14 17:34:39 INFO ApplicationMaster: Waiting for spark context
initialization...
21/05/14 17:34:39 WARN SparkConf: The configuration key
'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3
and may be removed in the future. Please use the new key
'spark.executor.memoryOverhead' instead.
21/05/14 17:34:39 ERROR ApplicationMaster: User application exited with
status 1
21/05/14 17:34:39 INFO ApplicationMaster: Final app status: FAILED,
exitCode: 13, (reason: User application exited with status 1)
21/05/14 17:34:39 ERROR ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at
org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:447)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:275)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:799)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:798)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:798)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.SparkUserAppException: User application exited
with 1
at
org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:106)
at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:667)
21/05/14 17:34:39 INFO ApplicationMaster: Deleting staging directory
hdfs://dev-cbb-datalake/user/nifiuser/.sparkStaging/application_1620318563358_0046
21/05/14 17:34:41 INFO ShutdownHookManager: Shutdown hook called


For more detailed output, check the application tracking page:
https://srvbigddvlsh115.us.dev.corp:8090/cluster/app/application_1620318563358_0046
Then click on links to logs of each attempt.
. Failing the application.
Exception in thread "main" org.apache.spark.SparkException: Application
application_1620318563358_0046 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1155)
at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1603)
at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
21/05/14 17:34:42 INFO util.ShutdownHookManager: Shutdown hook called
21/05/14 17:34:42 INFO util.ShutdownHookManager: Deleting directory
/tmp/spark-28fa7d64-5a1d-42fb-865f-e9bb24854e7c
21/05/14 17:34:42 INFO util.ShutdownHookManager: Deleting directory
/tmp/spark-db93f731-d48a-4a7b-986f-e0a016bbd7f3

Thanks,
Asmath


spark hbase

2021-04-20 Thread KhajaAsmath Mohammed
Hi,

I have tried multiple ways to use hbase-spark and none of them works as
expected. SHC and hbase-spark library are loading all the data on executors
and it is running for ever.

https://ramottamado.dev/how-to-use-hbase-fuzzyrowfilter-in-spark/

Above link has the solution that I am looking for but somehow dataframe is
not filtering those records. any suggestions?


Thanks,
Asmath


Spark submit hbase issue

2021-04-14 Thread KhajaAsmath Mohammed
Hi,

Spark submit is connecting to local host instead of zookeeper mentioned in 
hbase-site.xml. This same program works in ide which gets connected to 
hbase-site.xml. What am I missing in spark submit?
> 
> 
> spark-submit --driver-class-path  
> C:\Users\mdkha\bitbucket\clx-spark-scripts\src\test\resources\hbase-site.xml 
> --files 
> C:\Users\mdkha\bitbucket\clx-spark-scripts\src\test\resources\hbase-site.xml  
> --conf 
> "spark.driver.extraLibraryPath=C:\Users\mdkha\bitbucket\clx-spark-scripts\src\test\resources\hbase-site.xml"
>  --executor-memory 4g --class com.drivers.HBASEExportToS3 
> C:\Users\mdkha\bitbucket\clx-spark-scripts\target\clx-spark-scripts.jar -c 
> C:\Users\mdkha\bitbucket\clx-spark-scripts\src\test\resources\job.properties 
> 
> 
> 21/04/14 16:13:36 WARN ClientCnxn: Session 0x0 for server null, unexpected 
> error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused: no further information
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
> at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> at 
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> 21/04/14 16:13:37 WARN ReadOnlyZKClient: 0x2d73767e to localhost:2181 failed 
> for get of /hbase/hbaseid, code = CONNECTIONLOSS, retries = 1

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Overirde Jackson jar - Spark Submit

2021-04-14 Thread KhajaAsmath Mohammed
Hi,

I am having similar issue as mentioned in  the below link but was not able
to resolve. any other solutions please?

https://stackoverflow.com/questions/57329060/exclusion-of-dependency-of-spark-core-in-cdh

Thanks,
Asmath


Spark2.4 json Jackson errors

2021-04-13 Thread KhajaAsmath Mohammed
Hi,

I am having issue when running custom
Applications on spark2.4. I was able to
Run successfully on windows ide but cannot run this in emr spark2.4. I get 
jsonmethods not found error.

I have included json4s in Uber jar still I get this error. Any solution to 
resolve this? 

Thanks,
Asmath
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Session error with 30s

2021-04-13 Thread KhajaAsmath Mohammed
I was able to Resolve this by changing the Hdfs-site.xml as I mentioned in my 
initial thread

Thanks,
Asmath

> On Apr 12, 2021, at 8:35 PM, Peng Lei  wrote:
> 
> 
> Hi KhajaAsmath Mohammed
>   Please check the configuration of "spark.speculation.interval", just pass 
> the "30" to it.
>   
>  '''
>   override def start(): Unit = {
>   backend.start()
> 
>   if (!isLocal && conf.get(SPECULATION_ENABLED)) {
> logInfo("Starting speculative execution thread")
> speculationScheduler.scheduleWithFixedDelay(
>   () => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
>   SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
>   }
> }
>  '''
>   
> 
> Sean Owen  于2021年4月13日周二 上午3:30写道:
>> Something is passing this invalid 30s value, yes. Hard to say which property 
>> it is. I'd check if your cluster config sets anything with the value 30s - 
>> whatever is reading this property is not expecting it. 
>> 
>>> On Mon, Apr 12, 2021, 2:25 PM KhajaAsmath Mohammed 
>>>  wrote:
>>> Hi Sean,
>>> 
>>> Do you think anything that can cause this with DFS client?
>>> 
>>> java.lang.NumberFormatException: For input string: "30s"
>>> at 
>>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>>> at java.lang.Long.parseLong(Long.java:589)
>>> at java.lang.Long.parseLong(Long.java:631)
>>> at 
>>> org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
>>> at 
>>> org.apache.hadoop.hdfs.client.impl.DfsClientConf.(DfsClientConf.java:247)
>>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:301)
>>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:285)
>>> at 
>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
>>> at 
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>>> at 
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184)
>>> at 
>>> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
>>> at 
>>> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
>>> at scala.Option.getOrElse(Option.scala:121)
>>> at org.apache.spark.deploy.yarn.Client.(Client.scala:137)
>>> at 
>>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
>>> at 
>>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
>>> at org.apache.spark.SparkContext.(SparkContext.scala:501)
>>> at 
>>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
>>> at 
>>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:936)
>>> at 
>>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession
>>> 
>>> Thanks,
>>> Asmath
>>> 
>>>> On Mon, Apr 12, 2021 at 2:20 PM KhajaAsmath Mohammed 
>>>>  wrote:
>>>> I am using spark hbase connector provided by hortonwokrs. I was able to 
>>>> run without issues in my local environment and has this issue in emr. 
>>>> 
>>>> Thanks,
>>>> Asmath
>>>> 
>>>>>> On Apr 12, 2021, at 2:15 PM, Sean Owen  wrote:
>>>>>> 
>>>>> 
>>>>> Somewhere you're passing a property that expects a number, but give it 
>>>>> "30s". Is it a time property somewhere that really just wants MS or 
>>>>> something? But most time properties (all?) in Spark should accept that 
>>>>> type of input anyway. Really depends on what property has a problem and 
>>>>> what is setting it.
>>>>> 
>>>>>> On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed 
>>>>>>  wrote:
>>>>>> HI,
>>>>>> 
>>>>>> I am getting weird error when running spark job in emr cluster. Same 
>>>>>> program runs in my local machine. Is there anything that I need to do to 
>>>>>> resolve this?
>>>>>> 
>>>>>> 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
>>>>>> java.lang.NumberFormatException: For input string: "30s"
>>>>>> 
>>>>>> I tried the solution mentioned in the link below but it didn't work for 
>>>>>> me.
>>>>>> 
>>>>>> https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/
>>>>>> 
>>>>>> Thanks,
>>>>>> Asmath


Re: Spark Session error with 30s

2021-04-12 Thread KhajaAsmath Mohammed
Hi Sean,

Do you think anything that can cause this with DFS client?

java.lang.NumberFormatException: For input string: "30s"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)



* at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
  at
org.apache.hadoop.hdfs.client.impl.DfsClientConf.(DfsClientConf.java:247)
  at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:301)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:285)*
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184)
at
org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
at
org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.deploy.yarn.Client.(Client.scala:137)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
at org.apache.spark.SparkContext.(SparkContext.scala:501)
at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:936)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession

Thanks,
Asmath

On Mon, Apr 12, 2021 at 2:20 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> I am using spark hbase connector provided by hortonwokrs. I was able to
> run without issues in my local environment and has this issue in emr.
>
> Thanks,
> Asmath
>
> On Apr 12, 2021, at 2:15 PM, Sean Owen  wrote:
>
> 
> Somewhere you're passing a property that expects a number, but give it
> "30s". Is it a time property somewhere that really just wants MS or
> something? But most time properties (all?) in Spark should accept that type
> of input anyway. Really depends on what property has a problem and what is
> setting it.
>
> On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> HI,
>>
>> I am getting weird error when running spark job in emr cluster. Same
>> program runs in my local machine. Is there anything that I need to do to
>> resolve this?
>>
>> 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
>> java.lang.NumberFormatException: For input string: "30s"
>>
>> I tried the solution mentioned in the link below but it didn't work for
>> me.
>>
>>
>> https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/
>>
>> Thanks,
>> Asmath
>>
>


Re: Spark Session error with 30s

2021-04-12 Thread KhajaAsmath Mohammed
I am using spark hbase connector provided by hortonwokrs. I was able to run 
without issues in my local environment and has this issue in emr. 

Thanks,
Asmath

> On Apr 12, 2021, at 2:15 PM, Sean Owen  wrote:
> 
> 
> Somewhere you're passing a property that expects a number, but give it "30s". 
> Is it a time property somewhere that really just wants MS or something? But 
> most time properties (all?) in Spark should accept that type of input anyway. 
> Really depends on what property has a problem and what is setting it.
> 
>> On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed 
>>  wrote:
>> HI,
>> 
>> I am getting weird error when running spark job in emr cluster. Same program 
>> runs in my local machine. Is there anything that I need to do to resolve 
>> this?
>> 
>> 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
>> java.lang.NumberFormatException: For input string: "30s"
>> 
>> I tried the solution mentioned in the link below but it didn't work for me.
>> 
>> https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/
>> 
>> Thanks,
>> Asmath


Spark Session error with 30s

2021-04-12 Thread KhajaAsmath Mohammed
HI,

I am getting weird error when running spark job in emr cluster. Same
program runs in my local machine. Is there anything that I need to do to
resolve this?

21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
java.lang.NumberFormatException: For input string: "30s"

I tried the solution mentioned in the link below but it didn't work for me.

https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/

Thanks,
Asmath


Spark Hbase Hive error in EMR

2021-04-09 Thread KhajaAsmath Mohammed
Hi,

I am trying to connect hbase which sits on top of hive as external table. I
am getting below exception. Am I missing anything to pass here?


21/04/09 18:08:11 INFO ZooKeeper: Client environment:user.dir=/
21/04/09 18:08:11 INFO ZooKeeper: Initiating client connection,
connectString=localhost:2181 sessionTimeout=9
watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@7217e615
21/04/09 18:08:11 INFO ConnectionManager$HConnectionImplementation: Closing
zookeeper sessionid=0x0
21/04/09 18:08:11 INFO ClientCnxn: Opening socket connection to server
localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
(unknown error)
21/04/09 18:08:11 INFO ZooKeeper: Session: 0x0 closed
21/04/09 18:08:11 INFO ClientCnxn: EventThread shut down

java.io.IOException: java.lang.reflect.InvocationTargetException
21/04/09 18:08:11 INFO AppLog$:
java.io.IOException: java.lang.reflect.InvocationTargetException

java.lang.reflect.InvocationTargetException

21/04/09 18:08:11 INFO AppLog$:
java.lang.reflect.InvocationTargetException

21/04/09 18:08:11 INFO SparkContext: Invoking stop() from shutdown hook

spark-submit --files
/etc/hbase/conf/hbase-site.xml,/etc/hive/conf/hive-site.xml --jars
/usr/lib/hive/lib/hive-hbase-handler.jar,/usr/lib/hbase/hbase-client.jar,/usr/lib/hbase/hbase-protocol.jar,/usr/lib/hbase/hbase-common.jar,/usr/lib/hbase/hbase-it.jar,/usr/lib/hbase/hbase-server.jar,/usr/lib/hbase/hbase-hadoop2-compat.jar
--deploy-mode client --executor-memory 4g --class
com.figg.clx.drivers.HBASEExportToS3


Thanks,
Asmath


Re: Rdd - zip with index

2021-03-25 Thread KhajaAsmath Mohammed
Hi Mich,

Yes you are right. We were getting gz files and this is causing the issue. I 
will be changing it to bzip or other splittable formats and try running it 
again today. 

Thanks,
Asmath

Sent from my iPhone

> On Mar 25, 2021, at 6:51 AM, Mich Talebzadeh  
> wrote:
> 
> 
> Hi Asmath,
> 
> Have you actually managed to run this single file? Because Spark (as brought 
> up a few times already) will pull the whole of the GZ file in a single 
> partition in the driver, and can get an out of memory error.
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 24 Mar 2021 at 01:19, KhajaAsmath Mohammed  
>> wrote:
>> Hi,
>> 
>> I have 10gb file that should be loaded into spark dataframe. This file is 
>> csv with header and we were using rdd.zipwithindex to get column names and 
>> convert to avro accordingly. 
>> 
>> I am assuming this is taking long time and only executor runs and never 
>> achieves parallelism. Is there a easy way to achieve parallelism after 
>> filtering out the header. 
>> 
>> I am
>> Also interested in solution that can remove header from the file and I can 
>> give my own schema. This way I can split the files.
>> 
>> Rdd.partitions is always 1 for this even after repartitioning the dataframe 
>> after zip with index . Any help on this topic please .
>> 
>> Thanks,
>> Asmath
>> 
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


Re: Rdd - zip with index

2021-03-24 Thread KhajaAsmath Mohammed
Thanks Mich. I understood what I am supposed to do now, will try these
options.

I still dont understand how the spark will split the large file. I have a
10 GB file which I want to split automatically after reading. I can split
and load the file before reading but it is a very big requirement change
for all our data pipeline.

Is there a way to split the file once it is read to achieve parallelism ?
I will group groupby on one column to see if that improves my job.

On Wed, Mar 24, 2021 at 10:56 AM Mich Talebzadeh 
wrote:

> How does Spark establish there is a csv header as a matter of interest?
>
> Example
>
> val df = spark.read.option("header", true).csv(location)
>
> I need to tell spark to ignore the header correct?
>
> From Spark Read CSV file into DataFrame — SparkByExamples
> 
>
> If you have a header with column names on file, you need to explicitly
> specify true for header option using option("header",true)
> 
>  not
> mentioning this, the API treats header as a data record.
>
> Second point which may not be applicable to the newer versions of Spark. My
> understanding is that the gz file is not splittable, therefore Spark needs
> to read the whole file using a single core which will slow things down (CPU
> intensive). After the read is done the data can be shuffled to increase
> parallelism.
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 24 Mar 2021 at 12:40, Sean Owen  wrote:
>
>> No need to do that. Reading the header with Spark automatically is
>> trivial.
>>
>> On Wed, Mar 24, 2021 at 5:25 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> If it is a csv then it is a flat file somewhere in a directory I guess.
>>>
>>> Get the header out by doing
>>>
>>> */usr/bin/zcat csvfile.gz |head -n 1*
>>> Title Number,Tenure,Property
>>> Address,District,County,Region,Postcode,Multiple Address Indicator,Price
>>> Paid,Proprietor Name (1),Company Registration No. (1),Proprietorship
>>> Category (1),Country Incorporated (1),Proprietor (1) Address (1),Proprietor
>>> (1) Address (2),Proprietor (1) Address (3),Proprietor Name (2),Company
>>> Registration No. (2),Proprietorship Category (2),Country Incorporated
>>> (2),Proprietor (2) Address (1),Proprietor (2) Address (2),Proprietor (2)
>>> Address (3),Proprietor Name (3),Company Registration No. (3),Proprietorship
>>> Category (3),Country Incorporated (3),Proprietor (3) Address (1),Proprietor
>>> (3) Address (2),Proprietor (3) Address (3),Proprietor Name (4),Company
>>> Registration No. (4),Proprietorship Category (4),Country Incorporated
>>> (4),Proprietor (4) Address (1),Proprietor (4) Address (2),Proprietor (4)
>>> Address (3),Date Proprietor Added,Additional Proprietor Indicator
>>>
>>>
>>> 10GB is not much of a big CSV file
>>>
>>> that will resolve the header anyway.
>>>
>>>
>>> Also how are you running the spark, in a local mode (single jvm) or
>>> other distributed modes (yarn, standalone) ?
>>>
>>>
>>> HTH
>>>
>>


Re: Rdd - zip with index

2021-03-23 Thread KhajaAsmath Mohammed
So spark by default doesn’t split the large 10gb file when loaded? 

Sent from my iPhone

> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) 
>  wrote:
> 
> Hi, Mohammed 
> I think that the reason that only one executor is running and have single 
> partition is because you have single file that might be read/loaded into 
> memory.
> 
> In order to achieve better parallelism I’d suggest to split the csv file.
> 
> Another problem is question: why are you using rdd?
> Just Spark.read.option(“header”, 
> true).load()..select().write.format(“avro”).save(...)
> 
> 
>> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed  
>> wrote:
>> 
>> Hi,
>> 
>> I have 10gb file that should be loaded into spark dataframe. This file is 
>> csv with header and we were using rdd.zipwithindex to get column names and 
>> convert to avro accordingly. 
>> 
>> I am assuming this is taking long time and only executor runs and never 
>> achieves parallelism. Is there a easy way to achieve parallelism after 
>> filtering out the header. 
>> 
>> I am
>> Also interested in solution that can remove header from the file and I can 
>> give my own schema. This way I can split the files.
>> 
>> Rdd.partitions is always 1 for this even after repartitioning the dataframe 
>> after zip with index . Any help on this topic please .
>> 
>> Thanks,
>> Asmath
>> 
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Rdd - zip with index

2021-03-23 Thread KhajaAsmath Mohammed
Hi,

I have 10gb file that should be loaded into spark dataframe. This file is csv 
with header and we were using rdd.zipwithindex to get column names and convert 
to avro accordingly. 

I am assuming this is taking long time and only executor runs and never 
achieves parallelism. Is there a easy way to achieve parallelism after 
filtering out the header. 

I am
Also interested in solution that can remove header from the file and I can give 
my own schema. This way I can split the files.

Rdd.partitions is always 1 for this even after repartitioning the dataframe 
after zip with index . Any help on this topic please .

Thanks,
Asmath

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Repartition or Coalesce not working

2021-03-22 Thread KhajaAsmath Mohammed
Thanks Sean.I just realized it. Let me try that.

On Mon, Mar 22, 2021 at 12:31 PM Sean Owen  wrote:

> You need to do something with the result of repartition. You haven't
> changed textDF
>
> On Mon, Mar 22, 2021, 12:15 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a use case where there are large files in hdfs.
>>
>> Size of the file is 3 GB.
>>
>> It is an existing code in production and I am trying to improve the
>> performance of the job.
>>
>> Sample Code:
>> textDF=dataframe ( This is dataframe that got created from hdfs path)
>> logging.info("Number of partitions"+str(txt_df.rdd.getNumPartitions()))
>> --> Prints 1
>> textDF.repartition(100)
>> logging.info("Number of partitions"+str(txt_df.rdd.getNumPartitions()))
>> --> Prints 1
>>
>> Any suggestions  on why this is happening?
>>
>> Next Block of the code which takes time:
>> rdd.filter(lambda line: len(line)!=collistlenth)
>>
>> any way to parallelize and speed up my process on this?
>>
>> Thanks,
>> Asmath
>>
>


Repartition or Coalesce not working

2021-03-22 Thread KhajaAsmath Mohammed
Hi,

I have a use case where there are large files in hdfs.

Size of the file is 3 GB.

It is an existing code in production and I am trying to improve the
performance of the job.

Sample Code:
textDF=dataframe ( This is dataframe that got created from hdfs path)
logging.info("Number of partitions"+str(txt_df.rdd.getNumPartitions()))
--> Prints 1
textDF.repartition(100)
logging.info("Number of partitions"+str(txt_df.rdd.getNumPartitions()))
--> Prints 1

Any suggestions  on why this is happening?

Next Block of the code which takes time:
rdd.filter(lambda line: len(line)!=collistlenth)

any way to parallelize and speed up my process on this?

Thanks,
Asmath


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-21 Thread KhajaAsmath Mohammed
Thanks. Do we have option to limit number of records ? Like process only 1 
or the property we pass ? This way we can handle the amount of the data for 
batches that we need . 

Sent from my iPhone

> On Oct 21, 2020, at 12:11 AM, lec ssmi  wrote:
> 
> 
> Structured streaming's  bottom layer also uses a micro-batch mechanism. 
> It seems that the first batch is slower than  the latter, I also often 
> encounter this problem. It feels related to the division of batches. 
>Other the other hand, spark's batch size is usually bigger than flume 
> transaction bache size. 
> 
> 
> KhajaAsmath Mohammed  于2020年10月21日周三 下午12:19写道:
>> Yes. Changing back to latest worked but I still see the slowness compared to 
>> flume. 
>> 
>> Sent from my iPhone
>> 
>>>> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
>>>> 
>>> 
>>> Do you start your application  with  chasing the early Kafka data  ? 
>>> 
>>> Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
>>>> Are you getting any output? Streaming jobs typically run forever, and keep 
>>>> processing data as it comes in the input. If a streaming job is working 
>>>> well, it will typically generate output at a certain cadence
>>>> 
>>>>  
>>>> 
>>>> From: KhajaAsmath Mohammed 
>>>> Date: Tuesday, October 20, 2020 at 1:23 PM
>>>> To: "user @spark" 
>>>> Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with 
>>>> query 0
>>>> 
>>>>  
>>>> 
>>>> CAUTION: This email originated from outside of the organization. Do not 
>>>> click links or open attachments unless you can confirm the sender and know 
>>>> the content is safe.
>>>> 
>>>>  
>>>> 
>>>> Hi,
>>>> 
>>>>  
>>>> 
>>>> I have started using spark structured streaming for reading data from kaka 
>>>> and the job is very slow. Number of output rows keeps increasing in query 
>>>> 0 and the job is running forever. any suggestions for this please? 
>>>> 
>>>>  
>>>> 
>>>> 
>>>>  
>>>> 
>>>> Thanks,
>>>> 
>>>> Asmath


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread KhajaAsmath Mohammed
Yes. Changing back to latest worked but I still see the slowness compared to 
flume. 

Sent from my iPhone

> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
> 
> 
> Do you start your application  with  chasing the early Kafka data  ? 
> 
> Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
>> Are you getting any output? Streaming jobs typically run forever, and keep 
>> processing data as it comes in the input. If a streaming job is working 
>> well, it will typically generate output at a certain cadence
>> 
>>  
>> 
>> From: KhajaAsmath Mohammed 
>> Date: Tuesday, October 20, 2020 at 1:23 PM
>> To: "user @spark" 
>> Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with query >> 0
>> 
>>  
>> 
>> CAUTION: This email originated from outside of the organization. Do not 
>> click links or open attachments unless you can confirm the sender and know 
>> the content is safe.
>> 
>>  
>> 
>> Hi,
>> 
>>  
>> 
>> I have started using spark structured streaming for reading data from kaka 
>> and the job is very slow. Number of output rows keeps increasing in query 0 
>> and the job is running forever. any suggestions for this please? 
>> 
>>  
>> 
>> 
>> 
>>  
>> 
>> Thanks,
>> 
>> Asmath


Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread KhajaAsmath Mohammed
Hi,

I have started using spark structured streaming for reading data from kaka
and the job is very slow. Number of output rows keeps increasing in query 0
and the job is running forever. any suggestions for this please?

[image: image.png]

Thanks,
Asmath


Spark JDBC- OAUTH example

2020-09-30 Thread KhajaAsmath Mohammed
Hi,

I am looking for some information on how to read database which has oauth
authentication with spark -jdbc. any links that point to this approach
would be really helpful

Thanks,
Asmath


Fwd: Time stamp in Kafka

2020-08-15 Thread KhajaAsmath Mohammed
Hi,
> 
> We have a producer application that has written data to Kafka topic. 
> 
> We are reading the data from Kafka topic using spark streaming but the time 
> stamp on Kafka is 1969-12-31 format for all the data. 
> 
> Is there a way to fix this while reading ?
> 
> Thanks,
> Asmath 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Structured streaming 2.4 - Kill and deploy in yarn

2020-08-10 Thread KhajaAsmath Mohammed
Hi ,

I am looking for some information on how to gracefully kill the spark
structured streaming kafka job and redeploy it.

How to kill a spark structured job in YARN?
any suggestions on how to kill gracefully?

I was able to monitor the job from SQL tab but how can I kill this job when
deployed in YARN without knowing yarn id?

Thanks,
Asmath


Application Upgrade - structured streaming

2020-07-10 Thread KhajaAsmath Mohammed
Hi,

When doing application upgrade for spark structured streaming, do we need to 
delete the checkpoint or does it start consuming offsets from the point we left?

kafka source we need to use the option "StartingOffsets" with a json string like

""" {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
Do we need to use as above when application is restarted by storing offsets at 
some place ? 

Thanks,
Asmath

Sent from my iPhone

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-06 Thread KhajaAsmath Mohammed
Thanks Lim, this is really helpful. I have few questions.

Our earlier approach used low level customer to read offsets from database
and use those information to read using spark streaming in Dstreams. Save
the offsets back once the process is finished. This way we never lost data.

with your library, will it automatically process from the last offset it
processed when the application was stopped or killed for some time.

Thanks,
Asmath

On Sun, Jul 5, 2020 at 6:22 PM Jungtaek Lim 
wrote:

> There're sections in SS programming guide which exactly answer these
> questions:
>
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>
> Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
> the author) to help you commit the offset to Kafka with the specific group
> ID.
>
> https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer
>
> After then, you can also leverage the Kafka ecosystem to monitor the
> progress in point of Kafka's view, especially the gap between highest
> offset and committed offset.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
> wrote:
>
>> In 3.0 the community just added it.
>>
>> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
>> wrote:
>>
>>> Hi,
>>>
>>> We are trying to move our existing code from spark dstreams to
>>> structured streaming for one of the old application which we built few
>>> years ago.
>>>
>>> Structured streaming job doesn’t have streaming tab in sparkui. Is there
>>> a way to monitor the job submitted by us in structured streaming ? Since
>>> the job runs for every trigger, how can we kill the job and restart if
>>> needed.
>>>
>>> Any suggestions on this please
>>>
>>> Thanks,
>>> Asmath
>>>
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread KhajaAsmath Mohammed
Hi,

We are trying to move our existing code from spark dstreams to structured 
streaming for one of the old application which we built few years ago.

Structured streaming job doesn’t have streaming tab in sparkui. Is there a way 
to monitor the job submitted by us in structured streaming ? Since the job runs 
for every trigger, how can we kill the job and restart if needed. 

Any suggestions on this please 

Thanks,
Asmath



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Loop through Dataframes

2019-10-06 Thread KhajaAsmath Mohammed
Hi,

What is the best approach to loop through 3 dataframes in scala based on
some keys instead of using collect.

Thanks,
Asmath


Re: Structred Streaming Error

2019-05-22 Thread KhajaAsmath Mohammed
I was able to resolve the error. Initially I was giving custom name for
subscribe but was giving topic name at topics options.Giving the same name
at both places worked.

I am confused now with the difference of giving value for topic and
subsribe here. do you have any suggestions?

  reader.option("subscribe", topics);
reader.option("startingOffsets", startOff);
reader.option("failOnDataLoss", false);
reader.option("topic", topics);

On Wed, May 22, 2019 at 8:10 AM Gabor Somogyi 
wrote:

> Have you tried what the exception suggests?
>
> If startingOffsets contains specific offsets, you must specify all
> TopicPartitions.
>
> BR,
> G
>
>
> On Tue, May 21, 2019 at 9:16 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> I am getting below errror when running sample strreaming app. does anyone
>> have resolution for this?
>>
>> JSON OFFSET {"test":{"0":0,"1":0,"2":0,"3":0,"4":0,"5":0}}
>>  - Herreee
>> root
>>  |-- key: string (nullable = true)
>>  |-- value: string (nullable = true)
>>  |-- topic: string (nullable = true)
>>  |-- partition: string (nullable = true)
>>  |-- offset: string (nullable = true)
>>
>> Herreee
>> No physical plan. Waiting for data.
>>
>> org.apache.spark.sql.streaming.StreamingQueryException: assertion failed:
>> If startingOffsets contains specific offsets, you must specify all
>> TopicPartitions.
>> Use -1 for latest, -2 for earliest, if you don't care.
>> Specified: Set(test-0, test-5, test-1, test-4, test-3, test-2) Assigned:
>> Set()
>> === Streaming Query ===
>> Identifier: [id = d74f6c1a-fa30-4ae3-b87d-f1b03dc6e659, runId =
>> 31e49067-8cc3-4f9c-9817-fc311ae2a417]
>> Current Committed Offsets: {}
>> Current Available Offsets: {}
>>
>> Current State: ACTIVE
>> Thread State: RUNNABLE
>>
>> Logical Plan:
>> Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS
>> value#22, cast(topic#9 as string) AS topic#23, cast(partition#10 as string)
>> AS partition#24, cast(offset#11L as string) AS offset#25]
>> +- StreamingExecutionRelation KafkaV2[Subscribe[testStream]], [key#7,
>> value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
>>
>>
>> java.lang.AssertionError: assertion failed: If startingOffsets contains
>> specific offsets, you must specify all TopicPartitions.
>> Use -1 for latest, -2 for earliest, if you don't care.
>> Specified: Set(test-0, test-5, test-1, test-4, test-3, test-2) Assigned:
>> Set()
>>
>> Thanks,
>> Asmath
>>
>


Structred Streaming Error

2019-05-21 Thread KhajaAsmath Mohammed
Hi,

I am getting below errror when running sample strreaming app. does anyone
have resolution for this?

JSON OFFSET {"test":{"0":0,"1":0,"2":0,"3":0,"4":0,"5":0}}
 - Herreee
root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)

Herreee
No physical plan. Waiting for data.

org.apache.spark.sql.streaming.StreamingQueryException: assertion failed:
If startingOffsets contains specific offsets, you must specify all
TopicPartitions.
Use -1 for latest, -2 for earliest, if you don't care.
Specified: Set(test-0, test-5, test-1, test-4, test-3, test-2) Assigned:
Set()
=== Streaming Query ===
Identifier: [id = d74f6c1a-fa30-4ae3-b87d-f1b03dc6e659, runId =
31e49067-8cc3-4f9c-9817-fc311ae2a417]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [cast(key#7 as string) AS key#21, cast(value#8 as string) AS
value#22, cast(topic#9 as string) AS topic#23, cast(partition#10 as string)
AS partition#24, cast(offset#11L as string) AS offset#25]
+- StreamingExecutionRelation KafkaV2[Subscribe[testStream]], [key#7,
value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]


java.lang.AssertionError: assertion failed: If startingOffsets contains
specific offsets, you must specify all TopicPartitions.
Use -1 for latest, -2 for earliest, if you don't care.
Specified: Set(test-0, test-5, test-1, test-4, test-3, test-2) Assigned:
Set()

Thanks,
Asmath


Spark SQL JDBC teradata syntax error

2019-05-03 Thread KhajaAsmath Mohammed
Hi

I have followed link
https://community.teradata.com/t5/Connectivity/Teradata-JDBC-Driver-returns-the-wrong-schema-column-nullability/m-p/77824
to
connect teradata from spark.

I was able to print schema if I give table name instead of sql query.

I am getting below error if I give query(code snippet from above link). any
help is appreciated?

Exception in thread "main" java.sql.SQLException: [Teradata Database]
[TeraJDBC 16.20.00.10] [Error 3707] [SQLState 42000] Syntax error, expected
something like an 'EXCEPT' keyword or an 'UNION' keyword or a 'MINUS'
keyword between the word 'VEHP91_BOM' and '?'.
at
com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:309)
at
com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:103)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:128)
at
com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
at com.teradata.jdbc.jdbc_4.TDStatement.prepareRequest(TDStatement.java:576)
at
com.teradata.jdbc.jdbc_4.TDPreparedStatement.(TDPreparedStatement.java:131)
at
com.teradata.jdbc.jdk6.JDK6_SQL_PreparedStatement.(JDK6_SQL_PreparedStatement.java:30)
at
com.teradata.jdbc.jdk6.JDK6_SQL_Connection.constructPreparedStatement(JDK6_SQL_Connection.java:82)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1337)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1381)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1367)


Thanks,
Asmath


Spark SQL Teradata load is very slow

2019-05-02 Thread KhajaAsmath Mohammed
Hi,

I have teradata table who has more than 2.5 billion records and data size
is around 600 GB. I am not able to pull efficiently using spark SQL and it
is been running for more than 11 hours. here is my code.

  val df2 = sparkSession.read.format("jdbc")
.option("url", "jdbc:teradata://PROD/DATABASE=101")
.option("user", "HDFS_TD")
.option("password", "C")
.option("dbtable", "")
.option("numPartitions", partitions)
.option("driver","com.teradata.jdbc.TeraDriver")
.option("partitionColumn", "itm_bloon_seq_no")
.option("lowerBound", config.getInt("lowerBound"))
.option("upperBound", config.getInt("upperBound"))

Lower bound is 0 and upperbound is 300. Spark is using multiple executors
but most of the executors are running fast and few executors are taking
more time may be due to shuffling or something else.

I also tried repartition on column but no luck. is there a better way to
load this fast?

Table in teradata is view but not the table.

Thanks,
Asmath


Java Heap Space error - Spark ML

2019-03-22 Thread KhajaAsmath Mohammed
Hi,

I am getting the below exception when using Spark Kmeans. Any solutions
from the experts. Would be really helpful.

val kMeans = new KMeans().setK(reductionCount).setMaxIter(30)

val kMeansModel = kMeans.fit(df)

Error is occured when calling kmeans.fit


Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760)
at
org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614)
at
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
at
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382)
at
org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)
at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
at
com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25)
at
com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182)
at
com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94)
at
com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Asmath

>


Streaming Tab in Kafka Structured Streaming

2019-02-18 Thread KhajaAsmath Mohammed
Hi,

I am new to the structured streaming but used dstreams a lot. Difference I
saw in the spark UI is the streaming tab for dstreams.

Is there a way to know how many records and batches were executed in
structred streaming and also any option on how to see streaming tab?

Thanks,
Asmath


Transaction Examplefor spark streaming in Spark2.2

2018-03-22 Thread KhajaAsmath Mohammed
Hi Cody,

I am following to implement the exactly once semantics and also utilize
storing the offsets in database. Question I have is how to use hive instead
of traditional datastores. write to hive will be successful even though
there is any issue with saving offsets into DB. Could you please correct me
if I am wrong or let me know if you have any other suggestions.

stream.foreachRDD { rdd =>
   if (!rdd.isEmpty()) {
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  DB.localTx { implicit session =>
   *// Write data to Hive after creating dataframes from Dtream RDD*

// Store Offsets to DB
offsetRanges.foreach { osr =>
  val offsetRows = sql"""
  update txn_offsets set offset =
${osr.untilOffset}
where topic = ${osr.topic} and part =
${osr.partition} and offset = ${osr.fromOffset}
  """.update.apply()
  if (offsetRows != 1) {
throw new Exception(s"""
  Got $offsetRows rows affected instead of 1
when attempting to update offsets for
   ${osr.topic} ${osr.partition}
${osr.fromOffset} -> ${osr.untilOffset}
  Was a partition repeated after a worker
failure?
  """)
  }
}

  }
   }
}

Thanks,
Asmath


Dynamic allocation Spark Stremaing

2018-03-06 Thread KhajaAsmath Mohammed
Hi,

I have enabled dynamic allocation for spark streaming application but the
number of containers always shows as 2. Is there a way to get more when job
is running?




Thanks,
Asmath


Joins in spark for large tables

2018-02-28 Thread KhajaAsmath Mohammed
Hi,

Is there any best approach to reduce shuffling in spark. I have two tables
and both of them are large. any suggestions? I saw only about broadcast but
that will not work in my case.

Thanks,
Asmath


Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread KhajaAsmath Mohammed
I am also looking for the same answer. Will this work in streaming application 
too ?? 

Sent from my iPhone

> On Feb 12, 2018, at 8:12 AM, Debabrata Ghosh  wrote:
> 
> Georg - Thanks ! Will you be able to help me with a few examples please.
> 
> Thanks in advance again !
> 
> Cheers,
> D
> 
>> On Mon, Feb 12, 2018 at 6:03 PM, Georg Heiler  
>> wrote:
>> You should look into window functions for spark sql. 
>> Debabrata Ghosh  schrieb am Mo. 12. Feb. 2018 um 
>> 13:10:
>>> Hi,
>>>  Greetings !
>>> 
>>>  I needed some efficient way in pyspark to execute a 
>>> comparison (on all the attributes) between the current row and the previous 
>>> row. My intent here is to leverage the distributed framework of Spark to 
>>> the best extent so that can achieve a good speed. Please can anyone suggest 
>>> me a suitable algorithm / command. Here is a snapshot of the underlying 
>>> data which I need to compare:
>>> 
>>> 
>>> 
>>> Thanks in advance !
>>> 
>>> D
> 


Structure streaming to hive with kafka 0.9

2018-02-01 Thread KhajaAsmath Mohammed
Hi,

Could anyone please share example of on how to use spark structured streaming 
with kafka and write data into hive. Versions that I do have are

Spark 2.1 on CDH5.10
Kafka 0.9

Thanks,
Asmath 

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming checkpoint

2018-01-29 Thread KhajaAsmath Mohammed
Hi,

I have written spark streaming job to use the checkpoint. I have stopped
the streaming job for 5 days and then restart it today.

I have encountered weird issue where it shows as zero records for all
cycles till date. is it causing data loss?

[image: Inline image 1]


Thanks,
Asmath


Production Critical : Data loss in spark streaming

2018-01-22 Thread KhajaAsmath Mohammed
Hi,

I have been using the spark streaming with kafka. I have to restart the
application daily due to kms issue and after restart the offsets are not
matching with the point I left. I am creating checkpoint directory with

val streamingContext = StreamingContext.getOrCreate(checkPointDir, () =>
createStreamingContext(checkPointDir, sparkSession, batchInt, kafkaParams,
topicsSet, config, sparkConfig))

Batch 1:


Batch 2: After Restart and completion of two batches.


[image: Inline image 1]
Thanks,
Asmath


Gracefully shutdown spark streaming application

2018-01-21 Thread KhajaAsmath Mohammed
Hi,

Could anyone please provide your thoughts on how to kill spark streaming
application gracefully.

I followed link of
http://why-not-learn-something.blogspot.in/2016/05/apache-spark-streaming-how-to-do.html

https://github.com/lanjiang/streamingstopgraceful

I played around with having either property or adding marker file as github
link. Marker works sometimes successfully and it alos gets stuck sometimes.

Is there any efficient way to kill application?

Thanks,
Asmath


Re: Spark Stream is corrupted

2018-01-18 Thread KhajaAsmath Mohammed
Any solutions for this problem please .

Sent from my iPhone

> On Jan 17, 2018, at 10:39 PM, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> 
> wrote:
> 
> Hi,
> 
> I have created a streaming object from checkpoint but it always through up 
> error as stream corrupted when I restart spark streaming job. any solution 
> for this?
> 
> private def createStreamingContext(
> sparkCheckpointDir: String, sparkSession: SparkSession,
> batchDuration: Int, config: com.typesafe.config.Config) = {
> val topics = config.getString(Constants.Properties.KafkaTopics)
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> config.getString(Constants.Properties.KafkaBrokerList))
> val ssc = new StreamingContext(sparkSession.sparkContext, 
> Seconds(batchDuration))
> val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> val datapointDStream = 
> messages.map(_._2).map(TransformDatapoint.parseDataPointText)
> lazy val sqlCont = sparkSession.sqlContext
> 
> hiveDBInstance = config.getString("hiveDBInstance")
> 
> TransformDatapoint.readDstreamData(sparkSession, sqlCont, 
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, 
> fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)
> 
> ssc.checkpoint(sparkCheckpointDir)
> ssc
>   }
> 
> 
> 
> // calling streming context method
> 
>  val streamingContext = 
> StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
>  () => 
> createStreamingContext(config.getString(Constants.Properties.CheckPointDir), 
> sparkSession, config.getInt(Constants.Properties.BatchInterval), config))
> 
> ERROR:
> org.apache.spark.SparkException: Failed to read checkpoint from directory 
> hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC
> 
> java.io.IOException: Stream is corrupted
> 
> 
> Thanks,
> Asmath


Spark Stream is corrupted

2018-01-17 Thread KhajaAsmath Mohammed
Hi,

I have created a streaming object from checkpoint but it always through up
error as stream corrupted when I restart spark streaming job. any solution
for this?

private def createStreamingContext(
sparkCheckpointDir: String, sparkSession: SparkSession,
batchDuration: Int, config: com.typesafe.config.Config) = {
val topics = config.getString(Constants.Properties.KafkaTopics)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
val ssc = new StreamingContext(sparkSession.sparkContext,
Seconds(batchDuration))
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
lazy val sqlCont = sparkSession.sqlContext

hiveDBInstance = config.getString("hiveDBInstance")

TransformDatapoint.readDstreamData(sparkSession, sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)

ssc.checkpoint(sparkCheckpointDir)
ssc
  }



// calling streming context method

 val streamingContext =
StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
() =>
createStreamingContext(config.getString(Constants.Properties.CheckPointDir),
sparkSession, config.getInt(Constants.Properties.BatchInterval), config))

*ERROR:*
org.apache.spark.SparkException: Failed to read checkpoint from directory
hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC

java.io.IOException: Stream is corrupted


Thanks,
Asmath


Re: Spark Streaming not reading missed data

2018-01-16 Thread KhajaAsmath Mohammed
sometimes I get this messages in logs but the job still runs. do you have
solution on how to fix this? I have added the code in my earlier email.

Exception in thread "pool-22-thread-9" java.lang.NullPointerException

at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(
Checkpoint.scala:233)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

On Tue, Jan 16, 2018 at 3:16 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> It could be a missing persist before the checkpoint
>
> > On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > Spark streaming job from kafka is not picking the messages and is always
> taking the latest offsets when streaming job is stopped for 2 hours. It is
> not picking up the offsets that are required to be processed from
> checkpoint directory.  any suggestions on how to process the old messages
> too when there is shutdown or planned maintenance?
> >
> >  val topics = config.getString(Constants.Properties.KafkaTopics)
> >   val topicsSet = topics.split(",").toSet
> >   val kafkaParams = Map[String, String]("metadata.broker.list" ->
> config.getString(Constants.Properties.KafkaBrokerList))
> >   val sparkSession: SparkSession = runMode match {
> > case "local" => SparkSession.builder.config(
> sparkConfig).getOrCreate
> > case "yarn"  => SparkSession.builder.config(sparkConfig).
> enableHiveSupport.getOrCreate
> >   }
> >   val streamingContext = new StreamingContext(sparkSession.sparkContext,
> Seconds(config.getInt(Constants.Properties.BatchInterval)))
> >   streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> >   val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
> >   val datapointDStream = messages.map(_._2).map(TransformDatapoint.
> parseDataPointText)
> >   lazy val sqlCont = sparkSession.sqlContext
> >
> >   hiveDBInstance=config.getString("hiveDBInstance")
> >
> >   TransformDatapoint.readDstreamData(sparkSession,sqlCont,
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
> fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
> >
> >   //transformDstreamData(sparkSession,datapointDStream,
> runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,
> fencedVINDPTmpTableName);
> >   streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> >   streamingContext.start()
> >   streamingContext.awaitTermination()
> >   streamingContext.stop(stopSparkContext = true, stopGracefully =
> true)
> >
> > Thanks,
> > Asmath
>


Spark Streaming not reading missed data

2018-01-16 Thread KhajaAsmath Mohammed
Hi,

Spark streaming job from kafka is not picking the messages and is always
taking the latest offsets when streaming job is stopped for 2 hours. It is
not picking up the offsets that are required to be processed from
checkpoint directory.  any suggestions on how to process the old messages
too when there is shutdown or planned maintenance?

 val topics = config.getString(Constants.Properties.KafkaTopics)
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
  val sparkSession: SparkSession = runMode match {
case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
case "yarn"  =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
  }
  val streamingContext = new
StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))

streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
  val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
  val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
  lazy val sqlCont = sparkSession.sqlContext

  hiveDBInstance=config.getString("hiveDBInstance")

  TransformDatapoint.readDstreamData(sparkSession,sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)


//transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);

streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
  streamingContext.start()
  streamingContext.awaitTermination()
  streamingContext.stop(stopSparkContext = true, stopGracefully = true)

Thanks,
Asmath


Null pointer exception in checkpoint directory

2018-01-16 Thread KhajaAsmath Mohammed
Hi,

I keep getting null pointer exception in the spark streaming job with
checkpointing. any suggestions to resolve this.

Exception in thread "pool-22-thread-9" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-22-thread-10" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-22-thread-11" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-22-thread-12" java.lang.NullPointerException

at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:233)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Thanks,
Asmath


Re: What does Blockchain technology mean for Big Data? And how Hadoop/Spark will play role with it?

2017-12-18 Thread KhajaAsmath Mohammed
I am looking for same answer too .. will wait for response from other people 

Sent from my iPhone

> On Dec 18, 2017, at 10:56 PM, Gaurav1809  wrote:
> 
> Hi All,
> 
> Will Bigdata tools & technology work with Blockchain in future? Any possible
> use cases that anyone is likely to face, please share.
> 
> Thanks
> Gaurav
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark ListenerBus

2017-12-06 Thread KhajaAsmath Mohammed
Hi,

I am running spark sql job and it completes without any issues. I am
getting errors as
ERROR: SparkListenerBus has already stopped! Dropping event
SparkListenerExecutorMetricsUpdate after completion of job. could anyone
share your suggestions on how to avoid it.

Thanks,
Asmath


Re: JDK1.8 for spark workers

2017-11-29 Thread KhajaAsmath Mohammed
This didnt work. I tried it but no luck.

On Wed, Nov 29, 2017 at 7:49 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> You can pass `JAVA_HOME` environment variable
>
> `spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0`
>
> On Wed, Nov 29, 2017 at 10:54 AM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running cloudera version of spark2.1 and our cluster is on JDK1.7.
>> For some of the libraries, I need JDK1.8, is there a way to set to run
>> Spark worker in JDK1.8 without upgrading .
>>
>> I was able run driver in JDK 1.8 by setting the path but not the workers.
>>
>> 17/11/28 20:22:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
>> 1.0 (TID 1, brksvl267.brk.navistar.com, executor 1):
>> java.lang.UnsupportedClassVersionError: org/wololo/geojson/GeoJSON :
>> Unsupported major.minor version 52.0
>>
>> Thanks,
>> Asmath
>>
>
>


JDK1.8 for spark workers

2017-11-29 Thread KhajaAsmath Mohammed
Hi,

I am running cloudera version of spark2.1 and our cluster is on JDK1.7. For
some of the libraries, I need JDK1.8, is there a way to set to run Spark
worker in JDK1.8 without upgrading .

I was able run driver in JDK 1.8 by setting the path but not the workers.

17/11/28 20:22:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0
(TID 1, brksvl267.brk.navistar.com, executor 1):
java.lang.UnsupportedClassVersionError: org/wololo/geojson/GeoJSON :
Unsupported major.minor version 52.0

Thanks,
Asmath


Re: Spark Streaming Kerberos Issue

2017-11-22 Thread KhajaAsmath Mohammed
[image: Inline image 1]

This is what we are on.

On Wed, Nov 22, 2017 at 12:33 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> We use oracle JDK. we are on unix.
>
> On Wed, Nov 22, 2017 at 12:31 PM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> Do you use oracle or open jdk? We recently had an issue with open jdk:
>> formerly, java Security extensions were installed by default - no longer so
>> on centos 7.3
>>
>> Are these installed?
>>
>> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
>> 2017 um 19:29:
>>
>>> I passed keytab, renewal is enabled by running the script every eight
>>> hours. User gets renewed by the script every eight hours.
>>>
>>> On Wed, Nov 22, 2017 at 12:27 PM, Georg Heiler <
>>> georg.kf.hei...@gmail.com> wrote:
>>>
>>>> Did you pass a keytab? Is renewal enabled in your kdc?
>>>> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
>>>> 2017 um 19:25:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have written spark stream job and job is running successfully for
>>>>> more than 36 hours. After around 36 hours job gets failed with kerberos
>>>>> issue. Any solution on how to resolve it.
>>>>>
>>>>> org.apache.spark.SparkException: Task failed while wri\
>>>>>
>>>>> ting rows.
>>>>>
>>>>> at org.apache.spark.sql.hive.Spar
>>>>> kHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterC
>>>>> ontainers.scala:328)
>>>>>
>>>>> at org.apache.spark.sql.hive.exec
>>>>> ution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply
>>>>> (InsertIntoHiveTable.scala:210)
>>>>>
>>>>> at org.apache.spark.sql.hive.exec
>>>>> ution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply
>>>>> (InsertIntoHiveTable.scala:210)
>>>>>
>>>>> at org.apache.spark.scheduler.Res
>>>>> ultTask.runTask(ResultTask.scala:87)
>>>>>
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>
>>>>> at org.apache.spark.executor.Exec
>>>>> utor$TaskRunner.run(Executor.scala:322)
>>>>>
>>>>> at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>
>>>>> at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
>>>>> java.io.IOException: org.apache.hadoop.security.aut
>>>>> hentication.client.\
>>>>>
>>>>> AuthenticationException: 
>>>>> org.apache.hadoop.security.token.SecretManager$InvalidToken:
>>>>> token (kms-dt owner=va_dflt, renewer=yarn, re\
>>>>>
>>>>> alUser=, issueDate=1511262017635, maxDate=1511866817635,
>>>>> sequenceNumber=1854601, masterKeyId=3392) is expired
>>>>>
>>>>> at org.apache.hadoop.hive.ql.io.H
>>>>> iveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)
>>>>>
>>>>> at org.apache.spark.sql.hive.Spar
>>>>> kHiveDynamicPartitionWriterContainer.newOutputWriter$1(hiveW
>>>>> riterContainers.scala:346)
>>>>>
>>>>> at org.apache.spark.sql.hive.Spar
>>>>> kHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterC
>>>>> ontainers.scala:304)
>>>>>
>>>>> ... 8 more
>>>>>
>>>>> Caused by: java.io.IOException: org.apache.hadoop.security.aut
>>>>> hentication.client.AuthenticationException: org.apache.hadoop.securit\
>>>>>
>>>>> y.token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
>>>>> renewer=yarn, realUser=, issueDate=1511262017635, maxDate=15118668\
>>>>>
>>>>> 17635, sequenceNumber=1854601, masterKeyId=3392) is expired
>>>>>
>>>>> at org.a

Re: Spark Streaming Kerberos Issue

2017-11-22 Thread KhajaAsmath Mohammed
We use oracle JDK. we are on unix.

On Wed, Nov 22, 2017 at 12:31 PM, Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> Do you use oracle or open jdk? We recently had an issue with open jdk:
> formerly, java Security extensions were installed by default - no longer so
> on centos 7.3
>
> Are these installed?
>
> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
> 2017 um 19:29:
>
>> I passed keytab, renewal is enabled by running the script every eight
>> hours. User gets renewed by the script every eight hours.
>>
>> On Wed, Nov 22, 2017 at 12:27 PM, Georg Heiler <georg.kf.hei...@gmail.com
>> > wrote:
>>
>>> Did you pass a keytab? Is renewal enabled in your kdc?
>>> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
>>> 2017 um 19:25:
>>>
>>>> Hi,
>>>>
>>>> I have written spark stream job and job is running successfully for
>>>> more than 36 hours. After around 36 hours job gets failed with kerberos
>>>> issue. Any solution on how to resolve it.
>>>>
>>>> org.apache.spark.SparkException: Task failed while wri\
>>>>
>>>> ting rows.
>>>>
>>>> at org.apache.spark.sql.hive.
>>>> SparkHiveDynamicPartitionWriterContainer.writeToFile(
>>>> hiveWriterContainers.scala:328)
>>>>
>>>> at org.apache.spark.sql.hive.
>>>> execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.
>>>> apply(InsertIntoHiveTable.scala:210)
>>>>
>>>> at org.apache.spark.sql.hive.
>>>> execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.
>>>> apply(InsertIntoHiveTable.scala:210)
>>>>
>>>> at org.apache.spark.scheduler.
>>>> ResultTask.runTask(ResultTask.scala:87)
>>>>
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(
>>>> Executor.scala:322)
>>>>
>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1145)
>>>>
>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:615)
>>>>
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
>>>> java.io.IOException: org.apache.hadoop.security.authentication.client.\
>>>>
>>>> AuthenticationException: org.apache.hadoop.security.
>>>> token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
>>>> renewer=yarn, re\
>>>>
>>>> alUser=, issueDate=1511262017635, maxDate=1511866817635,
>>>> sequenceNumber=1854601, masterKeyId=3392) is expired
>>>>
>>>> at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.
>>>> getHiveRecordWriter(HiveFileFormatUtils.java:248)
>>>>
>>>> at org.apache.spark.sql.hive.
>>>> SparkHiveDynamicPartitionWriterContainer.newOutputWriter$1(
>>>> hiveWriterContainers.scala:346)
>>>>
>>>> at org.apache.spark.sql.hive.
>>>> SparkHiveDynamicPartitionWriterContainer.writeToFile(
>>>> hiveWriterContainers.scala:304)
>>>>
>>>> ... 8 more
>>>>
>>>> Caused by: java.io.IOException: org.apache.hadoop.security.
>>>> authentication.client.AuthenticationException:
>>>> org.apache.hadoop.securit\
>>>>
>>>> y.token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
>>>> renewer=yarn, realUser=, issueDate=1511262017635, maxDate=15118668\
>>>>
>>>> 17635, sequenceNumber=1854601, masterKeyId=3392) is expired
>>>>
>>>> at org.apache.hadoop.crypto.key.kms.
>>>> LoadBalancingKMSClientProvider.decryptEncryptedKey(
>>>> LoadBalancingKMSClientProvider.java:216)
>>>>
>>>> at org.apache.hadoop.crypto.key.
>>>> KeyProviderCryptoExtension.decryptEncryptedKey(
>>>> KeyProviderCryptoExtension.java:388)
>>>>
>>>> at org.apache.hadoop.hdfs.DFSClient.
>>>> decryptEncryptedDataEncryptionKey(DFSClient.java:1440)
>>>>
>>>&g

Re: Spark Streaming Kerberos Issue

2017-11-22 Thread KhajaAsmath Mohammed
I passed keytab, renewal is enabled by running the script every eight
hours. User gets renewed by the script every eight hours.

On Wed, Nov 22, 2017 at 12:27 PM, Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> Did you pass a keytab? Is renewal enabled in your kdc?
> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
> 2017 um 19:25:
>
>> Hi,
>>
>> I have written spark stream job and job is running successfully for more
>> than 36 hours. After around 36 hours job gets failed with kerberos issue.
>> Any solution on how to resolve it.
>>
>> org.apache.spark.SparkException: Task failed while wri\
>>
>> ting rows.
>>
>> at org.apache.spark.sql.hive.
>> SparkHiveDynamicPartitionWriterContainer.writeToFile(
>> hiveWriterContainers.scala:328)
>>
>> at org.apache.spark.sql.hive.
>> execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.
>> apply(InsertIntoHiveTable.scala:210)
>>
>> at org.apache.spark.sql.hive.
>> execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.
>> apply(InsertIntoHiveTable.scala:210)
>>
>> at org.apache.spark.scheduler.
>> ResultTask.runTask(ResultTask.scala:87)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(
>> Executor.scala:322)
>>
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>>
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
>> java.io.IOException: org.apache.hadoop.security.authentication.client.\
>>
>> AuthenticationException: 
>> org.apache.hadoop.security.token.SecretManager$InvalidToken:
>> token (kms-dt owner=va_dflt, renewer=yarn, re\
>>
>> alUser=, issueDate=1511262017635, maxDate=1511866817635,
>> sequenceNumber=1854601, masterKeyId=3392) is expired
>>
>> at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.
>> getHiveRecordWriter(HiveFileFormatUtils.java:248)
>>
>> at org.apache.spark.sql.hive.
>> SparkHiveDynamicPartitionWriterContainer.newOutputWriter$1(
>> hiveWriterContainers.scala:346)
>>
>> at org.apache.spark.sql.hive.
>> SparkHiveDynamicPartitionWriterContainer.writeToFile(
>> hiveWriterContainers.scala:304)
>>
>> ... 8 more
>>
>> Caused by: java.io.IOException: org.apache.hadoop.security.
>> authentication.client.AuthenticationException: org.apache.hadoop.securit\
>>
>> y.token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
>> renewer=yarn, realUser=, issueDate=1511262017635, maxDate=15118668\
>>
>> 17635, sequenceNumber=1854601, masterKeyId=3392) is expired
>>
>> at org.apache.hadoop.crypto.key.kms.
>> LoadBalancingKMSClientProvider.decryptEncryptedKey(
>> LoadBalancingKMSClientProvider.java:216)
>>
>> at org.apache.hadoop.crypto.key.
>> KeyProviderCryptoExtension.decryptEncryptedKey(
>> KeyProviderCryptoExtension.java:388)
>>
>> at org.apache.hadoop.hdfs.DFSClient.
>> decryptEncryptedDataEncryptionKey(DFSClient.java:1440)
>>
>> at org.apache.hadoop.hdfs.DFSClient.
>> createWrappedOutputStream(DFSClient.java:1542)
>>
>> at org.apache.hadoop.hdfs.DFSClient.
>> createWrappedOutputStream(DFSClient.java:1527)
>>
>> at org.apache.hadoop.hdfs.DistributedFileSystem$7.
>> doCall(DistributedFileSystem.java:428)
>>
>> at org.apache.hadoop.hdfs.DistributedFileSystem$7.
>> doCall(DistributedFileSystem.java:421)
>>
>> at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
>> FileSystemLinkResolver.java:81)
>>
>> at org.apache.hadoop.hdfs.DistributedFileSystem.create(
>> DistributedFileSystem.java:421)
>>
>> at org.apache.hadoop.hdfs.DistributedFileSystem.create(
>> DistributedFileSystem.java:362)
>>
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.
>> java:925)
>>
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.
>> java:906)
>>
>> at parquet.hadoop.ParquetFileWriter.(
>> ParquetFileWri

Spark Streaming Kerberos Issue

2017-11-22 Thread KhajaAsmath Mohammed
Hi,

I have written spark stream job and job is running successfully for more
than 36 hours. After around 36 hours job gets failed with kerberos issue.
Any solution on how to resolve it.

org.apache.spark.SparkException: Task failed while wri\

ting rows.

at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:99)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
java.io.IOException: org.apache.hadoop.security.authentication.client.\

AuthenticationException:
org.apache.hadoop.security.token.SecretManager$InvalidToken: token (kms-dt
owner=va_dflt, renewer=yarn, re\

alUser=, issueDate=1511262017635, maxDate=1511866817635,
sequenceNumber=1854601, masterKeyId=3392) is expired

at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)

at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.newOutputWriter$1(hiveWriterContainers.scala:346)

at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:304)

... 8 more

Caused by: java.io.IOException:
org.apache.hadoop.security.authentication.client.AuthenticationException:
org.apache.hadoop.securit\

y.token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
renewer=yarn, realUser=, issueDate=1511262017635, maxDate=15118668\

17635, sequenceNumber=1854601, masterKeyId=3392) is expired

at
org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider.decryptEncryptedKey(LoadBalancingKMSClientProvider.java:216)

at
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.decryptEncryptedKey(KeyProviderCryptoExtension.java:388)

at
org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1440)

at
org.apache.hadoop.hdfs.DFSClient.createWrappedOutputStream(DFSClient.java:1542)

at
org.apache.hadoop.hdfs.DFSClient.createWrappedOutputStream(DFSClient.java:1527)

at
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:428)

at
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:421)

at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:421)

at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:362)

at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:925)

at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)

at
parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:220)

at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:311)

at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:287)

at
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:65)

at
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)

at
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)

at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:260)

at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:245)

... 10 more

Caused by:
org.apache.hadoop.security.authentication.client.AuthenticationException:
org.apache.hadoop.security.token.SecretManager\

$InvalidToken: token (kms-dt owner=va_dflt, renewer=yarn, realUser=,
issueDate=1511262017635, maxDate=1511866817635, sequenceNumber\

=1854601, masterKeyId=3392) is expired

at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at

Spark Stremaing Hive Dynamic Partitions Issue

2017-11-22 Thread KhajaAsmath Mohammed
Hi,

I am able to wirte data into hive tables from spark stremaing. Job ran
successfully for 37 hours and I started getting errors in task failure as
below. Hive table has data too untill tasks are failed.

Job aborted due to stage failure: Task 0 in stage 691.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 691.0 (TID 10884,
brksvl171.brk.navistar.com, executor 2): org.apache.spark.SparkException:
Task failed while writing rows.+details

Job aborted due to stage failure: Task 0 in stage 691.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 691.0 (TID 10884,
brksvl171.brk.navistar.com, executor 2): org.apache.spark.SparkException:
Task failed while writing rows.

 at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)

 at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)

 at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)

 at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.NullPointerException

 at
parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)

 at
parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:111)

 at
parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)

 at
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.close(ParquetRecordWriterWrapper.java:102)

 at
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.close(ParquetRecordWriterWrapper.java:119)

 at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:320)

 ... 8 more



Driver stacktrace:


any solution for this please?


Thanks,

Asmath


Spark Streaming in Wait mode

2017-11-17 Thread KhajaAsmath Mohammed
Hi,

I am running spark streaming job and it is not picking up the next batches
but the job is still shows as running on YARN.

is this expected behavior if there is no data or waiting for data to pick
up?

I am almost behind 4 hours of batches (30 min interval)


[image: Inline image 1]

[image: Inline image 2]

hadoop.security.authentication=kerberos
spark.executor.memory=12g
spark.driver.am.memory=8G
spark.yarn.am.memoryOverhead=8g
spark.scheduler.mode=FAIR
spark.shuffle.compress=true
spark.shuffle.spill.compress=true
spark.broadcast.compress=true
spark.io.compression.codec=snappy
spark.dynamicAllocation.enabled=false
spark.streaming.dynamicAllocation.enabled=true
## HIVE JDBC ##
java.security.krb5.conf=krb5.conf
javax.security.auth.useSubjectCredsOnly=true
hive.jdbc.url=jdbc:Xa;principal=hive/_h...@ad.navistar.com;ssl=true
hive.jdbc.driver=org.apache.hive.jdbc.HiveDriver
keytab.file=va_dflt.keytab
spark.sql.parquet.binaryAsString=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.compression.codec=snappy
spark.rdd.compress=true
spark.io.compression.codec=snappy
spark.sql.tungsten.enabled=false
spark.sql.codegen=false
spark.sql.unsafe.enabled=false
index=15
includeIndex=true
BatchInterval=1800
CheckPointDir=hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPoint
KafkaBrokerList=XXX
KafkaTopics=occlocation
###33
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.locality.wait=10
spark.task.maxFailures=8
spark.ui.killEnabled=false
spark.logConf=true
# SPARK STREAMING CONFIGURATION
spark.streaming.blockInterval=200
spark.streaming.receiver.writeAheadLog.enable=true
spark.streaming.backpressure.enabled=true
#spark.streaming.backpressure.pid.minRate=10
#spark.streaming.receiver.maxRate=100
#spark.streaming.kafka.maxRatePerPartition==100
#spark.streaming.backpressure.initialRate=30
spark.yarn.maxAppAttempts=8
spark.yarn.am.attemptFailuresValidityInterval=1h
spark.yarn.executor.failuresValidityInterval=1h

any suggestions on why the batches are not running ? is it expected
behavior?

Thanks,
Asmath


Struct Type

2017-11-17 Thread KhajaAsmath Mohammed
Hi,

I have following schema in dataframe and I want to extract key which
matches as MaxSpeed from the array and it's corresponding value of the key.

|-- tags: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- key: string (nullable = true)
 |||-- value: string (nullable = true)

is there any way to achieve it in dataframe?

Thanks,
Asmath


Re: Spark Streaming Job completed without executing next batches

2017-11-16 Thread KhajaAsmath Mohammed
Here is screenshot . Status shows finished but it should be running for
next batch to pick up the data.


[image: Inline image 1]

On Thu, Nov 16, 2017 at 10:01 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I have scheduled spark streaming job to run every 30 minutes and it was
> running fine till 32 hours and suddenly I see status of Finsished instead
> of running (Since it always run in background and shows up in resource
> manager)
>
> Am i doing anything wrong here? how come job was finished without picking
> next bacth from kafka.
>
> I run using below command in cloudera cluster.
>
> spark2-submit --class com.telematics.datascience.drivers.OCCDataPointDriver
> --master yarn --queue hadvaoccx_dse_pool --principal va_d...@ad.nav.com
> --keytab ./va_dflt.keytab  Telematics.jar -c /home/yyy1k78/occtelematics/
> application-datapoint-hdfs-dyn.properties &
>
> Thanks,
> Asmath
>


Spark Streaming Job completed without executing next batches

2017-11-16 Thread KhajaAsmath Mohammed
Hi,

I have scheduled spark streaming job to run every 30 minutes and it was
running fine till 32 hours and suddenly I see status of Finsished instead
of running (Since it always run in background and shows up in resource
manager)

Am i doing anything wrong here? how come job was finished without picking
next bacth from kafka.

I run using below command in cloudera cluster.

spark2-submit --class com.telematics.datascience.drivers.OCCDataPointDriver
--master yarn --queue hadvaoccx_dse_pool --principal va_d...@ad.nav.com
--keytab ./va_dflt.keytab  Telematics.jar -c
/home/yyy1k78/occtelematics/application-datapoint-hdfs-dyn.properties &

Thanks,
Asmath


Restart Spark Streaming after deployment

2017-11-15 Thread KhajaAsmath Mohammed
Hi,

I am new in the usage of spark streaming. I have developed one spark
streaming job which runs every 30 minutes with checkpointing directory.

I have to implement minor change, shall I kill the spark streaming job once
the batch is completed using yarn application -kill command and update the
jar file?

Question I have is, if I follow the above approach will spark streaming
picks up data from offset saved in checkpoint after restart?

is there any other better approaches you have. Thanks in advance for your
suggestions.

Thanks,
Asmath


Spark Streaming in Spark 2.1 with Kafka 0.9

2017-11-09 Thread KhajaAsmath Mohammed
Hi,

I am not successful when using using spark 2.1 with Kafka 0.9, can anyone
please share the code snippet to use it.

val sparkSession: SparkSession = runMode match {
  case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
  case "yarn"  =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
}
val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(20))
println("streamingContext ->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics
->"+config.getString(Constants.Properties.KafkaBrokerList))

val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsSet)


with above code , job gets aborted.





I used code snippet of 0.10 too but no luck.


val streamingContext = new StreamingContext(sparkConfig, Seconds(20))
println("streamingContext ->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics
->"+config.getString(Constants.Properties.KafkaBrokerList))

//val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsSet)

val messages = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))


any suggestions on how to use Spark2.1 with Kafka streaming ?

Thanks,

Asmath


Spark Streaming Small files in Hive

2017-10-29 Thread KhajaAsmath Mohammed
Hi,

I am using spark streaming to write data back into hive with the below code
snippet


eventHubsWindowedStream.map(x => EventContent(new String(x)))

  .foreachRDD(rdd => {

val sparkSession = SparkSession
.builder.enableHiveSupport.getOrCreate

import sparkSession.implicits._

rdd.toDS.write.mode(org.apache.spark.sql.SaveMode.Append
).insertInto(hiveTableName)

  })

Hive table is partitioned by year,month,day so we end up getting less data
for some days and it in turn results in smaller files inside hive. Since
the data is being written in smaller files, there is lot of performance on
Impala/Hive when reading it? is there a way to merge files while inserting
data into hive?

It would be really helpful too if you anyone can provide suggestions on how
to design it in better way. we cannot use Hbase/kudu in this current
scenario due to space issue with clusters .

Thanks,

Asmath


Re: Structured Stream in Spark

2017-10-27 Thread KhajaAsmath Mohammed
Yes I checked both the output location and console too. It doesnt have any
data.

link also has the code and question that I have raised with Azure
HDInsights.

https://github.com/Azure/spark-eventhubs/issues/195


On Fri, Oct 27, 2017 at 3:22 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> The codes in the link write the data into files. Did you check the output
> location?
>
> By the way, if you want to see the data on the console, you can use the
> console sink by changing this line *format("parquet").option("path",
> outputPath + "/ETL").partitionBy("creationTime").start()* to
> *format("console").start().*
>
> On Fri, Oct 27, 2017 at 8:41 AM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi TathagataDas,
>>
>> I was trying to use eventhub with spark streaming. Looks like I was able
>> to make connection successfully but cannot see any data on the console. Not
>> sure if eventhub is supported or not.
>>
>> https://github.com/Azure/spark-eventhubs/blob/master/example
>> s/src/main/scala/com/microsoft/spark/sql/examples/EventHubsS
>> tructuredStreamingExample.scala
>> is the code snippet I have used to connect to eventhub
>>
>> Thanks,
>> Asmath
>>
>>
>>
>> On Thu, Oct 26, 2017 at 9:39 AM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Thanks TD.
>>>
>>> On Wed, Oct 25, 2017 at 6:42 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Please do not confuse old Spark Streaming (DStreams) with Structured
>>>> Streaming. Structured Streaming's offset and checkpoint management is far
>>>> more robust than DStreams.
>>>> Take a look at my talk - https://spark-summit.org/201
>>>> 7/speakers/tathagata-das/
>>>>
>>>> On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath Mohammed <
>>>> mdkhajaasm...@gmail.com> wrote:
>>>>
>>>>> Thanks Subhash.
>>>>>
>>>>> Have you ever used zero data loss concept with streaming. I am bit
>>>>> worried to use streamig when it comes to data loss.
>>>>>
>>>>> https://blog.cloudera.com/blog/2017/06/offset-management-for
>>>>> -apache-kafka-with-apache-spark-streaming/
>>>>>
>>>>>
>>>>> does structured streaming handles it internally?
>>>>>
>>>>> On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram <
>>>>> subhash.sri...@gmail.com> wrote:
>>>>>
>>>>>> No problem! Take a look at this:
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>>>>> gramming-guide.html#recovering-from-failures-with-checkpointing
>>>>>>
>>>>>> Thanks,
>>>>>> Subhash
>>>>>>
>>>>>> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
>>>>>> mdkhajaasm...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Sriram,
>>>>>>>
>>>>>>> Thanks. This is what I was looking for.
>>>>>>>
>>>>>>> one question, where do we need to specify the checkpoint directory
>>>>>>> in case of structured streaming?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Asmath
>>>>>>>
>>>>>>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <
>>>>>>> subhash.sri...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Asmath,
>>>>>>>>
>>>>>>>> Here is an example of using structured streaming to read from Kafka:
>>>>>>>>
>>>>>>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>>>>>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>>>>>>> fkaWordCount.scala
>>>>>>>>
>>>>>>>> In terms of parsing the JSON, there is a from_json function that
>>>>>>>> you can use. The following might help:
>>>>>>>>
>>>>>>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>>>>>>> formats-structured-streaming-apache-spark-2-1.html
>>>>>>>>
>>>>>>>> I hope this helps.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Subhash
>>>>>>>>
>>>>>>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>>>>>>> mdkhajaasm...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Could anyone provide suggestions on how to parse json data from
>>>>>>>>> kafka and load it back in hive.
>>>>>>>>>
>>>>>>>>> I have read about structured streaming but didn't find any
>>>>>>>>> examples. is there any best practise on how to read it and parse it 
>>>>>>>>> with
>>>>>>>>> structured streaming for this use case?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Asmath
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Structured Stream in Spark

2017-10-27 Thread KhajaAsmath Mohammed
Hi TathagataDas,

I was trying to use eventhub with spark streaming. Looks like I was able to
make connection successfully but cannot see any data on the console. Not
sure if eventhub is supported or not.

https://github.com/Azure/spark-eventhubs/blob/master/examples/src/main/scala/com/microsoft/spark/sql/examples/EventHubsStructuredStreamingExample.scala

is the code snippet I have used to connect to eventhub

Thanks,
Asmath



On Thu, Oct 26, 2017 at 9:39 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Thanks TD.
>
> On Wed, Oct 25, 2017 at 6:42 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Please do not confuse old Spark Streaming (DStreams) with Structured
>> Streaming. Structured Streaming's offset and checkpoint management is far
>> more robust than DStreams.
>> Take a look at my talk - https://spark-summit.org/201
>> 7/speakers/tathagata-das/
>>
>> On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Thanks Subhash.
>>>
>>> Have you ever used zero data loss concept with streaming. I am bit
>>> worried to use streamig when it comes to data loss.
>>>
>>> https://blog.cloudera.com/blog/2017/06/offset-management-for
>>> -apache-kafka-with-apache-spark-streaming/
>>>
>>>
>>> does structured streaming handles it internally?
>>>
>>> On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram <
>>> subhash.sri...@gmail.com> wrote:
>>>
>>>> No problem! Take a look at this:
>>>>
>>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>>> gramming-guide.html#recovering-from-failures-with-checkpointing
>>>>
>>>> Thanks,
>>>> Subhash
>>>>
>>>> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
>>>> mdkhajaasm...@gmail.com> wrote:
>>>>
>>>>> Hi Sriram,
>>>>>
>>>>> Thanks. This is what I was looking for.
>>>>>
>>>>> one question, where do we need to specify the checkpoint directory in
>>>>> case of structured streaming?
>>>>>
>>>>> Thanks,
>>>>> Asmath
>>>>>
>>>>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <
>>>>> subhash.sri...@gmail.com> wrote:
>>>>>
>>>>>> Hi Asmath,
>>>>>>
>>>>>> Here is an example of using structured streaming to read from Kafka:
>>>>>>
>>>>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>>>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>>>>> fkaWordCount.scala
>>>>>>
>>>>>> In terms of parsing the JSON, there is a from_json function that you
>>>>>> can use. The following might help:
>>>>>>
>>>>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>>>>> formats-structured-streaming-apache-spark-2-1.html
>>>>>>
>>>>>> I hope this helps.
>>>>>>
>>>>>> Thanks,
>>>>>> Subhash
>>>>>>
>>>>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>>>>> mdkhajaasm...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Could anyone provide suggestions on how to parse json data from
>>>>>>> kafka and load it back in hive.
>>>>>>>
>>>>>>> I have read about structured streaming but didn't find any examples.
>>>>>>> is there any best practise on how to read it and parse it with 
>>>>>>> structured
>>>>>>> streaming for this use case?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Asmath
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Structured streaming with event hubs

2017-10-27 Thread KhajaAsmath Mohammed
I was looking at this example but didnt get any output from it when used.

https://github.com/Azure/spark-eventhubs/blob/master/examples/src/main/scala/com/microsoft/spark/sql/examples/EventHubsStructuredStreamingExample.scala



On Fri, Oct 27, 2017 at 9:18 AM, ayan guha <guha.a...@gmail.com> wrote:

> Does event hub support seuctured streaming at all yet?
>
> On Fri, 27 Oct 2017 at 1:43 pm, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> Could anyone share if there is any code snippet on how to use spark
>> structured streaming with event hubs ??
>>
>> Thanks,
>> Asmath
>>
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


Structured streaming with event hubs

2017-10-26 Thread KhajaAsmath Mohammed
Hi,

Could anyone share if there is any code snippet on how to use spark structured 
streaming with event hubs ??

Thanks,
Asmath

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Stream in Spark

2017-10-26 Thread KhajaAsmath Mohammed
Thanks TD.

On Wed, Oct 25, 2017 at 6:42 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Please do not confuse old Spark Streaming (DStreams) with Structured
> Streaming. Structured Streaming's offset and checkpoint management is far
> more robust than DStreams.
> Take a look at my talk - https://spark-summit.org/
> 2017/speakers/tathagata-das/
>
> On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Thanks Subhash.
>>
>> Have you ever used zero data loss concept with streaming. I am bit
>> worried to use streamig when it comes to data loss.
>>
>> https://blog.cloudera.com/blog/2017/06/offset-management-
>> for-apache-kafka-with-apache-spark-streaming/
>>
>>
>> does structured streaming handles it internally?
>>
>> On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram <subhash.sri...@gmail.com
>> > wrote:
>>
>>> No problem! Take a look at this:
>>>
>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>> gramming-guide.html#recovering-from-failures-with-checkpointing
>>>
>>> Thanks,
>>> Subhash
>>>
>>> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
>>>> Hi Sriram,
>>>>
>>>> Thanks. This is what I was looking for.
>>>>
>>>> one question, where do we need to specify the checkpoint directory in
>>>> case of structured streaming?
>>>>
>>>> Thanks,
>>>> Asmath
>>>>
>>>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <
>>>> subhash.sri...@gmail.com> wrote:
>>>>
>>>>> Hi Asmath,
>>>>>
>>>>> Here is an example of using structured streaming to read from Kafka:
>>>>>
>>>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>>>> fkaWordCount.scala
>>>>>
>>>>> In terms of parsing the JSON, there is a from_json function that you
>>>>> can use. The following might help:
>>>>>
>>>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>>>> formats-structured-streaming-apache-spark-2-1.html
>>>>>
>>>>> I hope this helps.
>>>>>
>>>>> Thanks,
>>>>> Subhash
>>>>>
>>>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>>>> mdkhajaasm...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Could anyone provide suggestions on how to parse json data from kafka
>>>>>> and load it back in hive.
>>>>>>
>>>>>> I have read about structured streaming but didn't find any examples.
>>>>>> is there any best practise on how to read it and parse it with structured
>>>>>> streaming for this use case?
>>>>>>
>>>>>> Thanks,
>>>>>> Asmath
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread KhajaAsmath Mohammed
Thanks Subhash.

Have you ever used zero data loss concept with streaming. I am bit worried
to use streamig when it comes to data loss.

https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/


does structured streaming handles it internally?

On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram <subhash.sri...@gmail.com>
wrote:

> No problem! Take a look at this:
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#recovering-from-failures-with-checkpointing
>
> Thanks,
> Subhash
>
> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi Sriram,
>>
>> Thanks. This is what I was looking for.
>>
>> one question, where do we need to specify the checkpoint directory in
>> case of structured streaming?
>>
>> Thanks,
>> Asmath
>>
>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <subhash.sri...@gmail.com
>> > wrote:
>>
>>> Hi Asmath,
>>>
>>> Here is an example of using structured streaming to read from Kafka:
>>>
>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>> fkaWordCount.scala
>>>
>>> In terms of parsing the JSON, there is a from_json function that you can
>>> use. The following might help:
>>>
>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>> formats-structured-streaming-apache-spark-2-1.html
>>>
>>> I hope this helps.
>>>
>>> Thanks,
>>> Subhash
>>>
>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could anyone provide suggestions on how to parse json data from kafka
>>>> and load it back in hive.
>>>>
>>>> I have read about structured streaming but didn't find any examples. is
>>>> there any best practise on how to read it and parse it with structured
>>>> streaming for this use case?
>>>>
>>>> Thanks,
>>>> Asmath
>>>>
>>>
>>>
>>
>


Re: Structured Stream in Spark

2017-10-25 Thread KhajaAsmath Mohammed
Hi Sriram,

Thanks. This is what I was looking for.

one question, where do we need to specify the checkpoint directory in case
of structured streaming?

Thanks,
Asmath

On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <subhash.sri...@gmail.com>
wrote:

> Hi Asmath,
>
> Here is an example of using structured streaming to read from Kafka:
>
> https://github.com/apache/spark/blob/master/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredKafkaWordCount.scala
>
> In terms of parsing the JSON, there is a from_json function that you can
> use. The following might help:
>
> https://databricks.com/blog/2017/02/23/working-complex-
> data-formats-structured-streaming-apache-spark-2-1.html
>
> I hope this helps.
>
> Thanks,
> Subhash
>
> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> Could anyone provide suggestions on how to parse json data from kafka and
>> load it back in hive.
>>
>> I have read about structured streaming but didn't find any examples. is
>> there any best practise on how to read it and parse it with structured
>> streaming for this use case?
>>
>> Thanks,
>> Asmath
>>
>
>


Structured Stream in Spark

2017-10-25 Thread KhajaAsmath Mohammed
Hi,

Could anyone provide suggestions on how to parse json data from kafka and
load it back in hive.

I have read about structured streaming but didn't find any examples. is
there any best practise on how to read it and parse it with structured
streaming for this use case?

Thanks,
Asmath


Re: Spark - Partitions

2017-10-17 Thread KhajaAsmath Mohammed
val unionDS = rawDS.union(processedDS)
  //unionDS.persist(StorageLevel.MEMORY_AND_DISK)
  val unionedDS = unionDS.dropDuplicates()
  //val
unionedPartitionedDS=unionedDS.repartition(unionedDS("year"),unionedDS("month"),unionedDS("day")).persist(StorageLevel.MEMORY_AND_DISK)
  //unionDS.persist(StorageLevel.MEMORY_AND_DISK)
  unionDS.repartition(numPartitions);
  unionDS.createOrReplaceTempView("datapoint_prq_union_ds_view")
  sparkSession.sql(s"set hive.exec.dynamic.partition.mode=nonstrict")
  val deltaDSQry = "insert overwrite table  datapoint
PARTITION(year,month,day) select VIN, utctime, description, descriptionuom,
providerdesc, dt_map, islocation, latitude, longitude, speed,
value,current_date,YEAR, MONTH, DAY from datapoint_prq_union_ds_view"
  println(deltaDSQry)
  sparkSession.sql(deltaDSQry)


Here is the code and also properties used in my project.


On Tue, Oct 17, 2017 at 3:38 PM, Sebastian Piu <sebastian@gmail.com>
wrote:

> Can you share some code?
>
> On Tue, 17 Oct 2017, 21:11 KhajaAsmath Mohammed, <mdkhajaasm...@gmail.com>
> wrote:
>
>> In my case I am just writing the data frame back to hive. so when is the
>> best case to repartition it. I did repartition before calling insert
>> overwrite on table
>>
>> On Tue, Oct 17, 2017 at 3:07 PM, Sebastian Piu <sebastian@gmail.com>
>> wrote:
>>
>>> You have to repartition/coalesce *after *the action that is causing the
>>> shuffle as that one will take the value you've set
>>>
>>> On Tue, Oct 17, 2017 at 8:40 PM KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
>>>> Yes still I see more number of part files and exactly the number I have
>>>> defined did spark.sql.shuffle.partitions
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Oct 17, 2017, at 2:32 PM, Michael Artz <michaelea...@gmail.com>
>>>> wrote:
>>>>
>>>> Have you tried caching it and using a coalesce?
>>>>
>>>>
>>>>
>>>> On Oct 17, 2017 1:47 PM, "KhajaAsmath Mohammed" <
>>>> mdkhajaasm...@gmail.com> wrote:
>>>>
>>>>> I tried repartitions but spark.sql.shuffle.partitions is taking up
>>>>> precedence over repartitions or coalesce. how to get the lesser number of
>>>>> files with same performance?
>>>>>
>>>>> On Fri, Oct 13, 2017 at 3:45 AM, Tushar Adeshara <
>>>>> tushar_adesh...@persistent.com> wrote:
>>>>>
>>>>>> You can also try coalesce as it will avoid full shuffle.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> *Tushar Adeshara*
>>>>>>
>>>>>> *Technical Specialist – Analytics Practice*
>>>>>>
>>>>>> *Cell: +91-81490 04192 <+91%2081490%2004192>*
>>>>>>
>>>>>> *Persistent Systems** Ltd. **| **Partners in Innovation **|* 
>>>>>> *www.persistentsys.com
>>>>>> <http://www.persistentsys.com/>*
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *From:* KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
>>>>>> *Sent:* 13 October 2017 09:35
>>>>>> *To:* user @spark
>>>>>> *Subject:* Spark - Partitions
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am reading hive query and wiriting the data back into hive after
>>>>>> doing some transformations.
>>>>>>
>>>>>> I have changed setting spark.sql.shuffle.partitions to 2000 and since
>>>>>> then job completes fast but the main problem is I am getting 2000 files 
>>>>>> for
>>>>>> each partition
>>>>>> size of file is 10 MB .
>>>>>>
>>>>>> is there a way to get same performance but write lesser number of
>>>>>> files ?
>>>>>>
>>>>>> I am trying repartition now but would like to know if there are any
>>>>>> other options.
>>>>>>
>>>>>> Thanks,
>>>>>> Asmath
>>>>>> DISCLAIMER
>>>>>> ==
>>>>>> This e-mail may contain privileged and confidential information which
>>>>>> is the property of Persistent Systems Ltd. It is intended only for the 
>>>>>> use
>>>>>> of the individual or entity to which it is addressed. If you are not the
>>>>>> intended recipient, you are not authorized to read, retain, copy, print,
>>>>>> distribute or use this message. If you have received this communication 
>>>>>> in
>>>>>> error, please notify the sender and delete all copies of this message.
>>>>>> Persistent Systems Ltd. does not accept any liability for virus infected
>>>>>> mails.
>>>>>>
>>>>>
>>>>>
>>


application-datapoint-hdfs-dyn.properties
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark - Partitions

2017-10-17 Thread KhajaAsmath Mohammed
In my case I am just writing the data frame back to hive. so when is the
best case to repartition it. I did repartition before calling insert
overwrite on table

On Tue, Oct 17, 2017 at 3:07 PM, Sebastian Piu <sebastian@gmail.com>
wrote:

> You have to repartition/coalesce *after *the action that is causing the
> shuffle as that one will take the value you've set
>
> On Tue, Oct 17, 2017 at 8:40 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Yes still I see more number of part files and exactly the number I have
>> defined did spark.sql.shuffle.partitions
>>
>> Sent from my iPhone
>>
>> On Oct 17, 2017, at 2:32 PM, Michael Artz <michaelea...@gmail.com> wrote:
>>
>> Have you tried caching it and using a coalesce?
>>
>>
>>
>> On Oct 17, 2017 1:47 PM, "KhajaAsmath Mohammed" <mdkhajaasm...@gmail.com>
>> wrote:
>>
>>> I tried repartitions but spark.sql.shuffle.partitions is taking up
>>> precedence over repartitions or coalesce. how to get the lesser number of
>>> files with same performance?
>>>
>>> On Fri, Oct 13, 2017 at 3:45 AM, Tushar Adeshara <
>>> tushar_adesh...@persistent.com> wrote:
>>>
>>>> You can also try coalesce as it will avoid full shuffle.
>>>>
>>>>
>>>> Regards,
>>>>
>>>> *Tushar Adeshara*
>>>>
>>>> *Technical Specialist – Analytics Practice*
>>>>
>>>> *Cell: +91-81490 04192 <+91%2081490%2004192>*
>>>>
>>>> *Persistent Systems** Ltd. **| **Partners in Innovation **|* 
>>>> *www.persistentsys.com
>>>> <http://www.persistentsys.com/>*
>>>>
>>>>
>>>> --
>>>> *From:* KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
>>>> *Sent:* 13 October 2017 09:35
>>>> *To:* user @spark
>>>> *Subject:* Spark - Partitions
>>>>
>>>> Hi,
>>>>
>>>> I am reading hive query and wiriting the data back into hive after
>>>> doing some transformations.
>>>>
>>>> I have changed setting spark.sql.shuffle.partitions to 2000 and since
>>>> then job completes fast but the main problem is I am getting 2000 files for
>>>> each partition
>>>> size of file is 10 MB .
>>>>
>>>> is there a way to get same performance but write lesser number of files
>>>> ?
>>>>
>>>> I am trying repartition now but would like to know if there are any
>>>> other options.
>>>>
>>>> Thanks,
>>>> Asmath
>>>> DISCLAIMER
>>>> ==
>>>> This e-mail may contain privileged and confidential information which
>>>> is the property of Persistent Systems Ltd. It is intended only for the use
>>>> of the individual or entity to which it is addressed. If you are not the
>>>> intended recipient, you are not authorized to read, retain, copy, print,
>>>> distribute or use this message. If you have received this communication in
>>>> error, please notify the sender and delete all copies of this message.
>>>> Persistent Systems Ltd. does not accept any liability for virus infected
>>>> mails.
>>>>
>>>
>>>


Re: Spark - Partitions

2017-10-17 Thread KhajaAsmath Mohammed
Yes still I see more number of part files and exactly the number I have defined 
did spark.sql.shuffle.partitions

Sent from my iPhone

> On Oct 17, 2017, at 2:32 PM, Michael Artz <michaelea...@gmail.com> wrote:
> 
> Have you tried caching it and using a coalesce? 
> 
> 
> 
>> On Oct 17, 2017 1:47 PM, "KhajaAsmath Mohammed" <mdkhajaasm...@gmail.com> 
>> wrote:
>> I tried repartitions but spark.sql.shuffle.partitions is taking up 
>> precedence over repartitions or coalesce. how to get the lesser number of 
>> files with same performance?
>> 
>>> On Fri, Oct 13, 2017 at 3:45 AM, Tushar Adeshara 
>>> <tushar_adesh...@persistent.com> wrote:
>>> You can also try coalesce as it will avoid full shuffle.
>>> 
>>> 
>>> Regards,
>>> Tushar Adeshara
>>> 
>>> Technical Specialist – Analytics Practice
>>> 
>>> Cell: +91-81490 04192
>>> 
>>> Persistent Systems Ltd. | Partners in Innovation | www.persistentsys.com
>>> 
>>> 
>>> From: KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
>>> Sent: 13 October 2017 09:35
>>> To: user @spark
>>> Subject: Spark - Partitions
>>>  
>>> Hi,
>>> 
>>> I am reading hive query and wiriting the data back into hive after doing 
>>> some transformations.
>>> 
>>> I have changed setting spark.sql.shuffle.partitions to 2000 and since then 
>>> job completes fast but the main problem is I am getting 2000 files for each 
>>> partition 
>>> size of file is 10 MB .
>>> 
>>> is there a way to get same performance but write lesser number of files ?
>>> 
>>> I am trying repartition now but would like to know if there are any other 
>>> options.
>>> 
>>> Thanks,
>>> Asmath
>>> DISCLAIMER
>>> ==
>>> This e-mail may contain privileged and confidential information which is 
>>> the property of Persistent Systems Ltd. It is intended only for the use of 
>>> the individual or entity to which it is addressed. If you are not the 
>>> intended recipient, you are not authorized to read, retain, copy, print, 
>>> distribute or use this message. If you have received this communication in 
>>> error, please notify the sender and delete all copies of this message. 
>>> Persistent Systems Ltd. does not accept any liability for virus infected 
>>> mails.
>> 


Re: Spark - Partitions

2017-10-17 Thread KhajaAsmath Mohammed
I tried repartitions but spark.sql.shuffle.partitions is taking up
precedence over repartitions or coalesce. how to get the lesser number of
files with same performance?

On Fri, Oct 13, 2017 at 3:45 AM, Tushar Adeshara <
tushar_adesh...@persistent.com> wrote:

> You can also try coalesce as it will avoid full shuffle.
>
>
> Regards,
>
> *Tushar Adeshara*
>
> *Technical Specialist – Analytics Practice*
>
> *Cell: +91-81490 04192 <+91%2081490%2004192>*
>
> *Persistent Systems** Ltd. **| **Partners in Innovation **|* 
> *www.persistentsys.com
> <http://www.persistentsys.com/>*
>
>
> --
> *From:* KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> *Sent:* 13 October 2017 09:35
> *To:* user @spark
> *Subject:* Spark - Partitions
>
> Hi,
>
> I am reading hive query and wiriting the data back into hive after doing
> some transformations.
>
> I have changed setting spark.sql.shuffle.partitions to 2000 and since then
> job completes fast but the main problem is I am getting 2000 files for each
> partition
> size of file is 10 MB .
>
> is there a way to get same performance but write lesser number of files ?
>
> I am trying repartition now but would like to know if there are any other
> options.
>
> Thanks,
> Asmath
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>


Spark - Partitions

2017-10-12 Thread KhajaAsmath Mohammed
Hi,

I am reading hive query and wiriting the data back into hive after doing
some transformations.

I have changed setting spark.sql.shuffle.partitions to 2000 and since then
job completes fast but the main problem is I am getting 2000 files for each
partition
size of file is 10 MB .

is there a way to get same performance but write lesser number of files ?

I am trying repartition now but would like to know if there are any other
options.

Thanks,
Asmath


cannot cast to double from spark row

2017-09-14 Thread KhajaAsmath Mohammed
Hi,

I am getting below error when trying to cast column value from spark
dataframe to double. any issues. I tried many solutions but none of them
worked.

 java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Double

1. row.getAs[Double](Constants.Datapoint.Latitude)

2. row.getAs[String](Constants.Datapoint.Latitude).toDouble

I dont want to use row.getDouble(0) as position of column in file keeps on
change.

Thanks,
Asmath


java heap space

2017-09-03 Thread KhajaAsmath Mohammed
Hi,

I am getting java.lang.OutOfMemoryError: Java heap space error whenever I
ran the spark sql job.

I came to conclusion issue is because of reading number of files from spark.

I am reading 37 partitions and each partition has around 2000 files with
filesize more than 128 MB  37*2000 files from spark.

can anyone provide solution on how to merge all files while reading in
spark and run it efficently.

Increasing executor memory didnt resolve my issue. I went from 16 GB to 64
GB stil no luck.

Thanks,
Asmath


add arraylist to dataframe

2017-08-29 Thread KhajaAsmath Mohammed
Hi,

I am initiating arraylist before iterating throuugh the map method. I am
always getting the list size value as zero after map operation.

How do I add values to list inside the map method of dataframe ? any
suggestions?

 val points = new
java.util.ArrayList[com.vividsolutions.jts.geom.Coordinate]()
import scala.collection.JavaConversions._
df.rdd.map { row =>
val latitude =
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Latitude))
val longitude =
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Longitude))
points.add(new Coordinate(latitude, longitude))

}
points.size is always zero.


Thanks,
Asmath


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
I tried all the approaches.

1.Partitioned by year,month,day on hive table with parquet format when
table is created in impala.
2. Dataset from hive is not partitioned.  used insert overwrite
hivePartitonedTable partition(year,month,day) select * from
tempViewOFDataset . Also tried
Dataset.write.mode(overwrite).insertInto(hivePartitonedTable )
3. Tried approach of repartitioning dataset before inserting into hive
table as below.
unionedDS.repartition(unionedDS("year"),unionedDS("month"),unionedDS("day"))

None of the approaches helped me with performance.


On Sun, Aug 20, 2017 at 1:35 PM, ayan guha <guha.a...@gmail.com> wrote:

> Just curious - is your dataset partitioned on your partition columns?
>
> On Mon, 21 Aug 2017 at 3:54 am, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> We are in cloudera CDH5.10 and we are using spark 2 that comes with
>> cloudera.
>>
>> Coming to second solution, creating a temporary view on dataframe but it
>> didnt improve my performance too.
>>
>> I do remember performance was very fast when doing whole overwrite table
>> without partitons but the problem started after using partitions.
>>
>> On Sun, Aug 20, 2017 at 12:46 PM, Jörn Franke <jornfra...@gmail.com>
>> wrote:
>>
>>> Ah i see then I would check also directly in Hive if you have issues to
>>> insert data in the Hive table. Alternatively you can try to register
>>> the df as temptable and do a insert into the Hive table from the temptable
>>> using Spark sql ("insert into table hivetable select * from temptable")
>>>
>>>
>>> You seem to use Cloudera so you probably have a very outdated Hive
>>> version. So you could switch to a distribution having a recent version of
>>> Hive 2 with Tez+llap - these are much more performant with much more
>>> features.
>>>
>>> Alternatively you can try to register the df as temptable and do a
>>> insert into the Hive table from the temptable using Spark sql ("insert into
>>> table hivetable select * from temptable")
>>>
>>> On 20. Aug 2017, at 18:47, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I have created hive table in impala first with storage format as
>>> parquet. With dataframe from spark I am tryinig to insert into the same
>>> table with below syntax.
>>>
>>> Table is partitoned by year,month,day
>>> ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")
>>>
>>> https://issues.apache.org/jira/browse/SPARK-20049
>>>
>>> I saw something in the above link not sure if that is same thing in my
>>> case.
>>>
>>> Thanks,
>>> Asmath
>>>
>>> On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Have you made sure that the saveastable stores them as parquet?
>>>>
>>>> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed <
>>>> mdkhajaasm...@gmail.com> wrote:
>>>>
>>>> we are using parquet tables, is it causing any performance issue?
>>>>
>>>> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke <jornfra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Improving the performance of Hive can be also done by switching to
>>>>> Tez+llap as an engine.
>>>>> Aside from this : you need to check what is the default format that it
>>>>> writes to Hive. One issue for the slow storing into a hive table could be
>>>>> that it writes by default to csv/gzip or csv/bzip2
>>>>>
>>>>> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
>>>>> mdkhajaasm...@gmail.com> wrote:
>>>>> >
>>>>> > Yes we tried hive and want to migrate to spark for better
>>>>> performance. I am using paraquet tables . Still no better performance 
>>>>> while
>>>>> loading.
>>>>> >
>>>>> > Sent from my iPhone
>>>>> >
>>>>> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke <jornfra...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Have you tried directly in Hive how the performance is?
>>>>> >>
>>>>> >> In which Format do you expect Hive to write? Have you made sure it
>>>>> is in this format? It

Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
We are in cloudera CDH5.10 and we are using spark 2 that comes with
cloudera.

Coming to second solution, creating a temporary view on dataframe but it
didnt improve my performance too.

I do remember performance was very fast when doing whole overwrite table
without partitons but the problem started after using partitions.

On Sun, Aug 20, 2017 at 12:46 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Ah i see then I would check also directly in Hive if you have issues to
> insert data in the Hive table. Alternatively you can try to register the
> df as temptable and do a insert into the Hive table from the temptable
> using Spark sql ("insert into table hivetable select * from temptable")
>
>
> You seem to use Cloudera so you probably have a very outdated Hive
> version. So you could switch to a distribution having a recent version of
> Hive 2 with Tez+llap - these are much more performant with much more
> features.
>
> Alternatively you can try to register the df as temptable and do a insert
> into the Hive table from the temptable using Spark sql ("insert into table
> hivetable select * from temptable")
>
> On 20. Aug 2017, at 18:47, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> wrote:
>
> Hi,
>
> I have created hive table in impala first with storage format as parquet.
> With dataframe from spark I am tryinig to insert into the same table with
> below syntax.
>
> Table is partitoned by year,month,day
> ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")
>
> https://issues.apache.org/jira/browse/SPARK-20049
>
> I saw something in the above link not sure if that is same thing in my
> case.
>
> Thanks,
> Asmath
>
> On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke <jornfra...@gmail.com>
> wrote:
>
>> Have you made sure that the saveastable stores them as parquet?
>>
>> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
>> wrote:
>>
>> we are using parquet tables, is it causing any performance issue?
>>
>> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke <jornfra...@gmail.com>
>> wrote:
>>
>>> Improving the performance of Hive can be also done by switching to
>>> Tez+llap as an engine.
>>> Aside from this : you need to check what is the default format that it
>>> writes to Hive. One issue for the slow storing into a hive table could be
>>> that it writes by default to csv/gzip or csv/bzip2
>>>
>>> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>> >
>>> > Yes we tried hive and want to migrate to spark for better performance.
>>> I am using paraquet tables . Still no better performance while loading.
>>> >
>>> > Sent from my iPhone
>>> >
>>> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>> >>
>>> >> Have you tried directly in Hive how the performance is?
>>> >>
>>> >> In which Format do you expect Hive to write? Have you made sure it is
>>> in this format? It could be that you use an inefficient format (e.g. CSV +
>>> bzip2).
>>> >>
>>> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I have written spark sql job on spark2.0 by using scala . It is just
>>> pulling the data from hive table and add extra columns , remove duplicates
>>> and then write it back to hive again.
>>> >>>
>>> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data.
>>> Is there anything that I need to improve performance .
>>> >>>
>>> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb
>>> and dynamic allocation enabled.
>>> >>>
>>> >>> I am doing insert overwrite on partition by
>>> >>> Da.write.mode(overwrite).insertinto(table)
>>> >>>
>>> >>> Any suggestions please ??
>>> >>>
>>> >>> Sent from my iPhone
>>> >>> 
>>> -
>>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >>>
>>>
>>
>>
>


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
Hi,

I have created hive table in impala first with storage format as parquet.
With dataframe from spark I am tryinig to insert into the same table with
below syntax.

Table is partitoned by year,month,day
ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")

https://issues.apache.org/jira/browse/SPARK-20049

I saw something in the above link not sure if that is same thing in my case.

Thanks,
Asmath

On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> Have you made sure that the saveastable stores them as parquet?
>
> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> wrote:
>
> we are using parquet tables, is it causing any performance issue?
>
> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Improving the performance of Hive can be also done by switching to
>> Tez+llap as an engine.
>> Aside from this : you need to check what is the default format that it
>> writes to Hive. One issue for the slow storing into a hive table could be
>> that it writes by default to csv/gzip or csv/bzip2
>>
>> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>> >
>> > Yes we tried hive and want to migrate to spark for better performance.
>> I am using paraquet tables . Still no better performance while loading.
>> >
>> > Sent from my iPhone
>> >
>> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>> >>
>> >> Have you tried directly in Hive how the performance is?
>> >>
>> >> In which Format do you expect Hive to write? Have you made sure it is
>> in this format? It could be that you use an inefficient format (e.g. CSV +
>> bzip2).
>> >>
>> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have written spark sql job on spark2.0 by using scala . It is just
>> pulling the data from hive table and add extra columns , remove duplicates
>> and then write it back to hive again.
>> >>>
>> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data.
>> Is there anything that I need to improve performance .
>> >>>
>> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb
>> and dynamic allocation enabled.
>> >>>
>> >>> I am doing insert overwrite on partition by
>> >>> Da.write.mode(overwrite).insertinto(table)
>> >>>
>> >>> Any suggestions please ??
>> >>>
>> >>> Sent from my iPhone
>> >>> -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
>>
>
>


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
we are using parquet tables, is it causing any performance issue?

On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> Improving the performance of Hive can be also done by switching to
> Tez+llap as an engine.
> Aside from this : you need to check what is the default format that it
> writes to Hive. One issue for the slow storing into a hive table could be
> that it writes by default to csv/gzip or csv/bzip2
>
> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> wrote:
> >
> > Yes we tried hive and want to migrate to spark for better performance. I
> am using paraquet tables . Still no better performance while loading.
> >
> > Sent from my iPhone
> >
> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> >>
> >> Have you tried directly in Hive how the performance is?
> >>
> >> In which Format do you expect Hive to write? Have you made sure it is
> in this format? It could be that you use an inefficient format (e.g. CSV +
> bzip2).
> >>
> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have written spark sql job on spark2.0 by using scala . It is just
> pulling the data from hive table and add extra columns , remove duplicates
> and then write it back to hive again.
> >>>
> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data.
> Is there anything that I need to improve performance .
> >>>
> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb
> and dynamic allocation enabled.
> >>>
> >>> I am doing insert overwrite on partition by
> >>> Da.write.mode(overwrite).insertinto(table)
> >>>
> >>> Any suggestions please ??
> >>>
> >>> Sent from my iPhone
> >>> -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
>


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
Yes we tried hive and want to migrate to spark for better performance. I am 
using paraquet tables . Still no better performance while loading. 

Sent from my iPhone

> On Aug 20, 2017, at 2:24 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Have you tried directly in Hive how the performance is? 
> 
> In which Format do you expect Hive to write? Have you made sure it is in this 
> format? It could be that you use an inefficient format (e.g. CSV + bzip2).
> 
>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> 
>> wrote:
>> 
>> Hi,
>> 
>> I have written spark sql job on spark2.0 by using scala . It is just pulling 
>> the data from hive table and add extra columns , remove duplicates and then 
>> write it back to hive again.
>> 
>> In spark ui, it is taking almost 40 minutes to write 400 go of data. Is 
>> there anything that I need to improve performance .
>> 
>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb and 
>> dynamic allocation enabled.
>> 
>> I am doing insert overwrite on partition by
>> Da.write.mode(overwrite).insertinto(table)
>> 
>> Any suggestions please ??
>> 
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark hive overwrite is very very slow

2017-08-19 Thread KhajaAsmath Mohammed
Hi,

I have written spark sql job on spark2.0 by using scala . It is just pulling 
the data from hive table and add extra columns , remove duplicates and then 
write it back to hive again.

In spark ui, it is taking almost 40 minutes to write 400 go of data. Is there 
anything that I need to improve performance .

Spark.sql.partitions is 2000 in my case with executor memory of 16gb and 
dynamic allocation enabled.

I am doing insert overwrite on partition by
Da.write.mode(overwrite).insertinto(table)

Any suggestions please ??

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GC overhead exceeded

2017-08-18 Thread KhajaAsmath Mohammed
Hi Pat,

I am using dynamic scheduling with executor memory of 8 gb . Will check to do 
static scheduling by giving number of executor and cores.

Thanks,
Asmath

Sent from my iPhone

> On Aug 18, 2017, at 10:39 AM, Patrick Alwell <palw...@hortonworks.com> wrote:
> 
> +1 what is the executor memory? You may need to adjust executor memory and 
> cores. For the sake of simplicity; each executor can handle 5 concurrent 
> tasks and should have 5 cores. So if your cluster has 100 cores, you’d have 
> 20 executors. And if your cluster memory is 500gb, each executor would have  
> 25gb of memory.
>  
> What’s more, you can use tools like the Spark UI or Ganglia to determine 
> which step is failing and why. What is the overall cluster size? How many 
> executors do you have? Is it an appropriate count for this cluster’s cores? 
> I’m assuming you are using YARN?
>  
> -Pat
>  
> From: KhajaAsmath Mohammed <mdkhajaasm...@gmail.com>
> Date: Friday, August 18, 2017 at 5:30 AM
> To: Pralabh Kumar <pralabhku...@gmail.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: GC overhead exceeded
>  
> It is just a sql from hive table with transformation if adding 10 more 
> columns calculated for currency. Input size for this query is 2 months which 
> has around 450gb data.
>  
> I added persist but it didn't help. Also the executor memory is 8g . Any 
> suggestions please ?
> 
> Sent from my iPhone
> 
> On Aug 17, 2017, at 11:43 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote:
> 
> what's is your exector memory , please share the code also
>  
> On Fri, Aug 18, 2017 at 10:06 AM, KhajaAsmath Mohammed 
> <mdkhajaasm...@gmail.com> wrote:
>  
> HI,
>  
> I am getting below error when running spark sql jobs. This error is thrown 
> after running 80% of tasks. any solution?
>  
> spark.storage.memoryFraction=0.4
> spark.sql.shuffle.partitions=2000
> spark.default.parallelism=100
> #spark.eventLog.enabled=false
> #spark.scheduler.revive.interval=1s
> spark.driver.memory=8g
>  
>  
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.util.ArrayList.subList(ArrayList.java:955)
> at java.lang.String.split(String.java:2311)
> at 
> sun.net.util.IPAddressUtil.textToNumericFormatV4(IPAddressUtil.java:47)
> at java.net.InetAddress.getAllByName(InetAddress.java:1129)
> at java.net.InetAddress.getAllByName(InetAddress.java:1098)
> at java.net.InetAddress.getByName(InetAddress.java:1048)
> at org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:562)
> at 
> org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:579)
> at 
> org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109)
> at 
> org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
> at 
> org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
> at 
> org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37)
> at 
> org.apache.spark.scheduler.TaskSetManager.dequeueTask(TaskSetManager.scala:380)
> at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:433)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:276)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:271)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:357)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:355)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:355)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:352)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:352)
> at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:222)
>  
>  


Persist performace in Spark

2017-08-18 Thread KhajaAsmath Mohammed
Hi,

I am using persit before inserting dataframe data back into hive. This step
is adding 8 minutes to my total execution time.  is there a way to reduce
the total time without resulting in out of memory issues. Here is my code.

val datapoint_df: Dataset[Row] = sparkSession.sql(transposeHiveQry);
datapoint_df.persist(StorageLevel.MEMORY_AND_DISK)
datapoint_df.createOrReplaceTempView("ds_tmp")
sparkSession.sql(" insert overwrite table HiveTable select * from ds_tmp")

Thanks,
Asmath


Re: GC overhead exceeded

2017-08-18 Thread KhajaAsmath Mohammed
It is just a sql from hive table with transformation if adding 10 more columns 
calculated for currency. Input size for this query is 2 months which has around 
450gb data.

I added persist but it didn't help. Also the executor memory is 8g . Any 
suggestions please ?

Sent from my iPhone

> On Aug 17, 2017, at 11:43 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote:
> 
> what's is your exector memory , please share the code also
> 
>> On Fri, Aug 18, 2017 at 10:06 AM, KhajaAsmath Mohammed 
>> <mdkhajaasm...@gmail.com> wrote:
>> 
>> HI,
>> 
>> I am getting below error when running spark sql jobs. This error is thrown 
>> after running 80% of tasks. any solution?
>> 
>> spark.storage.memoryFraction=0.4
>> spark.sql.shuffle.partitions=2000
>> spark.default.parallelism=100
>> #spark.eventLog.enabled=false
>> #spark.scheduler.revive.interval=1s
>> spark.driver.memory=8g
>> 
>> 
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at java.util.ArrayList.subList(ArrayList.java:955)
>> at java.lang.String.split(String.java:2311)
>> at 
>> sun.net.util.IPAddressUtil.textToNumericFormatV4(IPAddressUtil.java:47)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1129)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1098)
>> at java.net.InetAddress.getByName(InetAddress.java:1048)
>> at 
>> org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:562)
>> at 
>> org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:579)
>> at 
>> org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109)
>> at 
>> org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
>> at 
>> org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
>> at 
>> org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37)
>> at 
>> org.apache.spark.scheduler.TaskSetManager.dequeueTask(TaskSetManager.scala:380)
>> at 
>> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:433)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:276)
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:271)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:357)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:355)
>> at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at 
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:355)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:352)
>> at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at 
>> org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:352)
>> at 
>> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:222)
>> 
> 


GC overhead exceeded

2017-08-17 Thread KhajaAsmath Mohammed
HI,

I am getting below error when running spark sql jobs. This error is thrown
after running 80% of tasks. any solution?

spark.storage.memoryFraction=0.4
spark.sql.shuffle.partitions=2000
spark.default.parallelism=100
#spark.eventLog.enabled=false
#spark.scheduler.revive.interval=1s
spark.driver.memory=8g


java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.ArrayList.subList(ArrayList.java:955)
at java.lang.String.split(String.java:2311)
at
sun.net.util.IPAddressUtil.textToNumericFormatV4(IPAddressUtil.java:47)
at java.net.InetAddress.getAllByName(InetAddress.java:1129)
at java.net.InetAddress.getAllByName(InetAddress.java:1098)
at java.net.InetAddress.getByName(InetAddress.java:1048)
at
org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:562)
at
org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:579)
at
org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109)
at
org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
at
org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
at
org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37)
at
org.apache.spark.scheduler.TaskSetManager.dequeueTask(TaskSetManager.scala:380)
at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:433)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:276)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.TaskSchedulerImpl.org
$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:271)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:357)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:355)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:355)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:352)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:352)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:222)


Re: Write only one output file in Spark SQL

2017-08-11 Thread KhajaAsmath Mohammed
we had spark.sql.partitions as 4 but in hdfs it is ending up with 200 files
and 4 files are actually having data and rest of them are having zero bytes.

My only requirement is to run fast for hive insert overwrite query from
spark temporary table and end up having less files instead of more files
with zero bytes.

I am using spark sql query of hive insert overwite not the write method on
dataframe as it is not supported in 1.6 version of spark for kerberos
cluster.


On Fri, Aug 11, 2017 at 12:23 PM, Lukas Bradley <lukasbrad...@gmail.com>
wrote:

> Please show the write() call, and the results in HDFS.  What are all the
> files you see?
>
> On Fri, Aug 11, 2017 at 1:10 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> tempTable = union_df.registerTempTable("tempRaw")
>>
>> create = hc.sql('CREATE TABLE IF NOT EXISTS blab.pyspark_dpprq (vin
>> string, utctime timestamp, description string, descriptionuom string,
>> providerdesc string, dt_map string, islocation string, latitude double,
>> longitude double, speed double, value string)')
>>
>> insert = hc.sql('INSERT OVERWRITE TABLE blab.pyspark_dpprq SELECT * FROM
>> tempRaw')
>>
>>
>>
>>
>> On Fri, Aug 11, 2017 at 11:00 AM, Daniel van der Ende <
>> daniel.vandere...@gmail.com> wrote:
>>
>>> Hi Asmath,
>>>
>>> Could you share the code you're running?
>>>
>>> Daniel
>>>
>>> On Fri, 11 Aug 2017, 17:53 KhajaAsmath Mohammed, <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I am using spark sql to write data back to hdfs and it is resulting in
>>>> multiple output files.
>>>>
>>>>
>>>>
>>>> I tried changing number spark.sql.shuffle.partitions=1 but it resulted
>>>> in very slow performance.
>>>>
>>>>
>>>>
>>>> Also tried coalesce and repartition still the same issue. any
>>>> suggestions?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Asmath
>>>>
>>>
>>
>


  1   2   >