Display a warning in EMR welcome screen

2024-05-11 Thread Abhishek Basu
Hi Team, for Elastic Map Reduce (EMR) cluster, it would be great if there is a 
warning message that Logging should be handled carefully and INFO or DEBUG 
should be enabled only when required. This logging thing took my whole day and 
lastly I discovered that it’s exploding the hdfs and not allowing it to perform 
the real task.

Thanks Abhishek


Sent from Yahoo Mail for iPhone


Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-02-13 Thread Abhishek Singla
Hi Team,

Could someone provide some insights into this issue?

Regards,
Abhishek Singla

On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
abhisheksingla...@gmail.com> wrote:

> Hi Team,
>
> Version: 3.2.2
> Java Version: 1.8.0_211
> Scala Version: 2.12.15
> Cluster: Standalone
>
> I am using Spark Streaming to read from Kafka and write to S3. The job
> fails with below error if there are no records published to Kafka for a few
> days and then there are some records published. Could someone help me in
> identifying the root cause of this job failure.
>
> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
> 0919e548-9706-4757-be94-359848100070] terminated with error
> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
> valid local directory for s3ablock-0001-
>   at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>   at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>   at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>   at 
> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>   at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>   at 
> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>   at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>   at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>   at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>   at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
>   at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
>   at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>   at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at scala.Option.getOrElse(Option.scala:189)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
>   at 
> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
>   at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeT

Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-01-17 Thread Abhishek Singla
)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)


Dataset df =
spark
.readStream()
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.options(appConfig.getKafka().getConf())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.option("checkpointLocation", appConfig.getChk().getPath())
.start()
.awaitTermination();


Regards,
Abhishek Singla


Re: config: minOffsetsPerTrigger not working

2023-04-27 Thread Abhishek Singla
Thanks, Mich for acknowledging.

Yes, I am providing the checkpoint path. I omitted it here in the code
snippet.

I believe this is due to spark version 3.1.x, this config is there only in
versions greater than 3.2.x

On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh 
wrote:

> Is this all of your writeStream?
>
> df.writeStream()
> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
> .start()
> .awaitTermination();
>
> What happened to the checkpoint location?
>
> option('checkpointLocation', checkpoint_path).
>
> example
>
>  checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"
>
>
> ls -l  /ssd/hduser/MDBatchBQ/chkpt
> total 24
> -rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
> drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
> drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
> drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits
>
> so you can see what is going on
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Apr 2023 at 15:46, Abhishek Singla 
> wrote:
>
>> Hi Team,
>>
>> I am using Spark Streaming to read from Kafka and write to S3.
>>
>> Version: 3.1.2
>> Scala Version: 2.12
>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>
>> Dataset df =
>> spark
>> .readStream()
>> .format("kafka")
>> .options(appConfig.getKafka().getConf())
>> .load()
>> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>> .start()
>> .awaitTermination();
>>
>> kafka.conf = {
>>"kafka.bootstrap.servers": "localhost:9092",
>>"subscribe": "test-topic",
>>"minOffsetsPerTrigger": 1000,
>>"maxOffsetsPerTrigger": 1100,
>>"maxTriggerDelay": "15m",
>>"groupIdPrefix": "test",
>>"startingOffsets": "latest",
>>"includeHeaders": true,
>>"failOnDataLoss": false
>>   }
>>
>> spark.conf = {
>>"spark.master": "spark://localhost:7077",
>>"spark.app.name": "app",
>>"spark.sql.streaming.kafka.useDeprecatedOffsetFetching": 
>> false,
>>"spark.sql.streaming.metricsEnabled": true
>>  }
>>
>>
>> But these configs do not seem to be working as I can see Spark processing
>> batches of 3k-15k immediately one after another. Is there something I am
>> missing?
>>
>> Ref:
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>
>>
>>
>>


config: minOffsetsPerTrigger not working

2023-04-27 Thread Abhishek Singla
Hi Team,

I am using Spark Streaming to read from Kafka and write to S3.

Version: 3.1.2
Scala Version: 2.12
Spark Kafka connector: spark-sql-kafka-0-10_2.12

Dataset df =
spark
.readStream()
.format("kafka")
.options(appConfig.getKafka().getConf())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.start()
.awaitTermination();

kafka.conf = {
   "kafka.bootstrap.servers": "localhost:9092",
   "subscribe": "test-topic",
   "minOffsetsPerTrigger": 1000,
   "maxOffsetsPerTrigger": 1100,
   "maxTriggerDelay": "15m",
   "groupIdPrefix": "test",
   "startingOffsets": "latest",
   "includeHeaders": true,
   "failOnDataLoss": false
  }

spark.conf = {
   "spark.master": "spark://localhost:7077",
   "spark.app.name": "app",
   "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
   "spark.sql.streaming.metricsEnabled": true
 }


But these configs do not seem to be working as I can see Spark processing
batches of 3k-15k immediately one after another. Is there something I am
missing?

Ref:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Regards,
Abhishek Singla


RE: Regarding spark-3.2.0 decommission features.

2022-01-26 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Dongjoon Hyun,

Any inputs on the below issue would be helpful. Please let us know if we're 
missing anything?

Thanks and Regards,
Abhishek

From: Patidar, Mohanlal (Nokia - IN/Bangalore) 
Sent: Thursday, January 20, 2022 11:58 AM
To: user@spark.apache.org
Subject: Suspected SPAM - RE: Regarding spark-3.2.0 decommission features.

Gentle reminder!!!

Br,
-Mohan Patidar



From: Patidar, Mohanlal (Nokia - IN/Bangalore)
Sent: Tuesday, January 18, 2022 2:02 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Cc: Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>>; Gowda Tp, Thimme 
(Nokia - IN/Bangalore) 
mailto:thimme.gowda...@nokia.com>>; Sharma, Prakash 
(Nokia - IN/Bangalore) 
mailto:prakash.sha...@nokia.com>>; Tarun, N (Nokia - 
IN/Bangalore) mailto:n.ta...@nokia.com>>; Badagandi, 
Srinivas B. (Nokia - IN/Bangalore) 
mailto:srinivas.b.badaga...@nokia.com>>
Subject: Regarding spark-3.2.0 decommission features.

Hi,
 We're using Spark 3.2.0 and we have enabled the spark decommission 
feature. As part of validating this feature, we wanted to check if the rdd 
blocks and shuffle blocks from the decommissioned executors are migrated to 
other executors.
However, we could not see this happening. Below is the configuration we used.

  1.  Spark Configuration used:
 spark.local.dir /mnt/spark-ldir
 spark.decommission.enabled true
 spark.storage.decommission.enabled true
 spark.storage.decommission.rddBlocks.enabled true
 spark.storage.decommission.shuffleBlocks.enabled true
 spark.dynamicAllocation.enabled true
  2.  Brought up spark-driver and executors on the different nodes.
NAME
  READY  STATUS   NODE
decommission-driver 
1/1 Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-2  1/1 
Running   Node2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-3  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-4  1/1 
Running   Node2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-5  1/1 
Running   Node1
  3.  Bringdown Node2 so status of pods as are following.

NAME
  READY  STATUS   NODE
decommission-driver 
1/1 Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-2  1/1 
TerminatingNode2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-3  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-4  1/1 
TerminatingNode2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-5  1/1 
Running   Node1
  4.  Driver logs:
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.296Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.459Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.564Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.601Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.667Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.885Z", 
"timezone":"UTC", "log":"Notify executor 5 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Notify executor 1 to decommissioning."}
{"type":"log", "level":"INFO", "t

Does Apache Spark 3 support GPU usage for Spark RDDs?

2021-09-21 Thread Abhishek Shakya
Hi,

I am currently trying to run genomic analyses pipelines using Hail(library
for genomics analyses written in python and Scala). Recently, Apache Spark
3 was released and it supported GPU usage.

I tried spark-rapids library to start an on-premise slurm cluster with gpu
nodes. I was able to initialise the cluster. However, when I tried running
hail tasks, the executors kept getting killed.

On querying in Hail forum, I got the response that

That’s a GPU code generator for Spark-SQL, and Hail doesn’t use any
Spark-SQL interfaces, only the RDD interfaces.
So, does Spark3 not support GPU usage for RDD interfaces?


PS: The question is posted in stackoverflow as well: Link
<https://stackoverflow.com/questions/69273205/does-apache-spark-3-support-gpu-usage-for-spark-rdds>


Regards,
-

Abhishek Shakya
Senior Data Scientist 1,
Contact: +919002319890 | Email ID: abhishek.sha...@aganitha.ai
Aganitha Cognitive Solutions <https://aganitha.ai/>


[Spark Core] saveAsTextFile is unable to rename a directory using hadoop-azure NativeAzureFileSystem

2021-09-13 Thread Abhishek Jindal
Hello,

I am trying to use the Spark rdd.saveAsTextFile function which calls the
FileSystem.rename() under the hood. This errors out with
“com.microsoft.azure.storage.StorageException: One of the request inputs is
not valid” when using hadoop-azure NativeAzureFileSystem. I have written a
small test program to rename a directory in Azure Blob Storage in Scala
that replicates this issue. Here is my code -

import java.net.URI

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.util.control.NonFatal

/**
  * A utility to test renaming a hadoop-azure path.
  */
object AzureRenameTester {

  def main(args: Array[String]): Unit = {

if (args.isEmpty) {
  throw new IllegalArgumentException("The Azure Blob storage key
must be provided!")
}

val key = args.head
val hadoopConfig = new Configuration()
hadoopConfig.set("fs.azure",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConfig.set("fs.wasbs.impl",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConfig.set("fs.AbstractFileSystem.wasbs.Impl",
"org.apache.hadoop.fs.azure.Wasbs")
hadoopConfig.set("fs.azure.account.key..blob.core.windows.net",
key)

val input = new
URI("wasbs://@.blob.core.windows.net/testing")
val inputPath = new Path(input)
val output = new
URI("wasbs://@.blob.core.windows.net/testingRenamed")
val outputPath = new Path(output)
val hadoopFs = FileSystem.get(input, hadoopConfig)

try {
  println(s"Renaming from $inputPath to $outputPath")
  hadoopFs.rename(inputPath, outputPath)
} catch {
  case NonFatal(ex) =>
println(s"${ExceptionUtils.getMessage(ex)}")
println(s"${ExceptionUtils.getRootCause(ex)}")
throw ex
}
  }
}

This code leads to the following error -

[error] Exception in thread "main"
org.apache.hadoop.fs.azure.AzureException:
com.microsoft.azure.storage.StorageException: One of the request
inputs is not valid.
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2849)
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2721)
[error] at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:460)
[error] at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:3277)
[error] at 
com.qf.util.hdfs.AzureRenameTester$.main(AzureRenameTester.scala:40)
[error] at 
com.qf.util.hdfs.AzureRenameTester.main(AzureRenameTester.scala)
[error] Caused by: com.microsoft.azure.storage.StorageException: One
of the request inputs is not valid.
[error] at 
com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
[error] at 
com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
[error] at 
com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
[error] at 
com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735)
[error] at 
com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691)
[error] at 
org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434)
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2788)
[error] ... 5 more

I am currently using spark-core-3.1.1.jar with hadoop-azure-3.2.2.jar but
this same issue also occurs in hadoop-azure-3.3.1.jar as well. Please
advise how I should solve this issue.

Thanks,
Abhishek


RE: Inclusive terminology usage in Spark

2021-06-30 Thread Rao, Abhishek (Nokia - IN/Bangalore)
HI Sean,

Thanks for the quick response. We’ll look into this.

Thanks and Regards,
Abhishek

From: Sean Owen 
Sent: Wednesday, June 30, 2021 6:30 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
Cc: User 
Subject: Re: Inclusive terminology usage in Spark

This was covered and mostly done last year: 
https://issues.apache.org/jira/browse/SPARK-32004
In some instances, it's hard to change the terminology as it would break user 
APIs, and the marginal benefit may not be worth it, but, have a look at the 
remaining task under that umbrella.

On Wed, Jun 30, 2021 at 5:25 AM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi,


Terms such as Blacklist/Whitelist and master/slave is used at different places 
in Spark Code. Wanted to know if there are any plans to modify this to more 
inclusive terminology, for eg: Denylist/Allowlist and Leader/Follower? If so, 
what is the timeline?

I’ve also created an improvement ticket to track this.

https://issues.apache.org/jira/browse/SPARK-35952

Thanks and Regards,
Abhishek



Inclusive terminology usage in Spark

2021-06-30 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi,


Terms such as Blacklist/Whitelist and master/slave is used at different places 
in Spark Code. Wanted to know if there are any plans to modify this to more 
inclusive terminology, for eg: Denylist/Allowlist and Leader/Follower? If so, 
what is the timeline?

I've also created an improvement ticket to track this.

https://issues.apache.org/jira/browse/SPARK-35952

Thanks and Regards,
Abhishek



RE: Why is Spark 3.0.x faster than Spark 3.1.x

2021-05-17 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Maziyar, Mich

Do we have any ticket to track this? Any idea if this is going to be fixed in 
3.1.2?

Thanks and Regards,
Abhishek

From: Mich Talebzadeh 
Sent: Friday, April 9, 2021 2:11 PM
To: Maziyar Panahi 
Cc: User 
Subject: Re: Why is Spark 3.0.x faster than Spark 3.1.x


Hi,

Regarding your point:

 I won't be able to defend this request by telling Spark users the previous 
major release was and still is more stable than the latest major release ...

With the benefit of hindsight version 3.1.1 was released recently and the 
definition of stable (from a practical point of view) does not come into it 
yet. That is perhaps the reason why some vendors like Cloudera are few releases 
away from the latest version. In production what matters most is the 
predictability and stability. You are not doing anything wrong by rolling it 
back and awaiting further clarification and resolution on the error.

HTH


[https://docs.google.com/uc?export=download=1qt8nKd2bxgs6clwYFqGy-k84L3N79hW6=0B1BiUVX33unjallLZWQwN1BDbGRMNTI5WUw3TlloMmJZRThjPQ]


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 9 Apr 2021 at 08:58, Maziyar Panahi 
mailto:maziyar.pan...@iscpif.fr>> wrote:
Thanks Mich, I will ask all of our users to use pyspark 3.0.x and will change 
all the notebooks/scripts to switch back from 3.1.1 to 3.0.2.

That's being said, I won't be able to defend this request by telling Spark 
users the previous major release was and still is more stable than the latest 
major release, something that made everything default to 3.1.1 (pyspark, 
downloads, etc.).

I'll see if I can open a ticket for this as well.


On 8 Apr 2021, at 17:27, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

Well the normal course of action (considering laws of diminishing returns)  is 
that your mileage varies:

Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding 
reason why you have to use 3.1.1, you can set it aside and try it when you have 
other use cases. For now I guess you can carry on with 3.0.1 as BAU.

HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 8 Apr 2021 at 16:19, Maziyar Panahi 
mailto:maziyar.pan...@iscpif.fr>> wrote:
I personally added the followings to my SparkSession in 3.1.1 and the result 
was exactly the same as before (local master). The 3.1.1 is still 4-5 times 
slower than 3.0.2 at least for that piece of code. I will do more investigation 
to see how it does with other stuff, especially anything without .transform or 
Spark ML related functions, but the small code I provided on any dataset that 
is big enough to take a minute to finish will show you the difference going 
from 3.0.2 to 3.1.1 by magnitude of 4-5:


.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.adaptive.enabled", "false")



On 8 Apr 2021, at 16:47, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the 
performance is worse

I assume it adds some overhead?



 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 8 Apr 2021 at 15:

s3a staging committer (directory committer) not writing data to s3 bucket (final output directory) in spark3

2021-02-22 Thread Rao, Abhishek (Nokia - IN/Bangalore)
loaded" : 0,
"committer_tasks_failed" : 0,
"stream_bytes_skipped_on_seek" : 0,
   "op_list_files" : 0,
"files_deleted" : 0,
"stream_bytes_discarded_in_abort" : 0,
"op_mkdirs" : 1,
"op_copy_from_local_file" : 0,
"op_is_directory" : 1,
"s3guard_metadatastore_throttled" : 0,
"S3guard_metadatastore_put_path_latency75thPercentileLatency" : 0,
"stream_write_total_time" : 0,
"stream_backward_seek_operations" : 0,
"object_put_requests" : 4,
"object_put_bytes" : 16282,
"directories_deleted" : 0,
"op_is_file" : 2,
"S3guard_metadatastore_throttle_rate99thPercentileFrequency (Hz)" : 0
  },
  "diagnostics" : {
"fs.s3a.metadatastore.impl" : 
"org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore",
"fs.s3a.committer.magic.enabled" : "false",
"fs.s3a.metadatastore.authoritative" : "false"
  },
  "filenames" : [ ]
}

===
With same s3 bucket if i run spark job with spark 2.4.5 then it writes data to 
s3://rookbucket/shiva/people.parquet/  and the _SUCCESS file looks similar to 
above one but "filenames" key in that json contain list of part files 
(parquet's data files) but with spark3 it is empty list as shown above.
There is no exception or error during write operation, but read fails to get 
the schema as the parquet file is empty.

Not sure what is causing the issue, I have attached the spark configuration 
which are used to submit the job as attachment(spark-default.conf).

I'm using Ceph as underlying storage for s3 buckets and if I use rados command 
to check data i can see parquet data with file name containing multipart upload 
in some path like below (but not in final output s3 path)

bash-4.2# rados ls  -p rook-ceph-store.rgw.buckets.data | grep 
"part-0-43466165-16d1-4b36-ab90-acb6c3c309a5"

Thanks and Regards,
Abhishek



spark-default.conf
Description: spark-default.conf

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Trigger on GroupStateTimeout with no new data in group

2021-02-11 Thread Abhishek Gupta
Hi All,

I had a question about modeling a user session kind of analytics use-case
in Spark Structured Streaming. Is there a way to model something like this
using Arbitrary stateful Spark streaming

User session -> reads a few FAQS on a website and then decides to create a
ticket or not
FAQ Deflection Metrics:
i) Successful Deflection: No issues created within 5 mins of reading the
last FAQ
ii) Failed Deflection: Issue is created within 5 mins of reading FAQ

There are 3 cases here, 2 of which can be done using FlatMapGroupWithState,
not sure about the 3rd i.e
i) Maintain user's last action state, if issue create event happens and
last state is FAQ view within 5 mins -> Failed deflection
ii) Maintain user's last state, if issue create and last state is FAQ view
beyond 5 mins -> Successful deflection
iii) Maintain user's last state with maybe a Processing Time timeout of 5
mins i.e FAQ viewed at T1, no issue creation event from user but time now
is T1 + 5 mins, so we should increment Successful deflection->

Can we do it using Spark GroupStateTimeout? I was confused if a timeout
trigger can happen with no data coming in the group


RE: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

2020-09-10 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi All,

We tried to regenerate the TPC DS data on S3 and after regeneration, we see 
that the queries are running faster and the execution time is now comparable 
with execution time on HDFS with Spark 3.0.0.
So may be there was some issue in generating the TPC DS data first time due to 
which we were seeing discrepancy in query execution time on S3 with Spark 3.0.0.

Thanks and Regards,
Abhishek

From: Gourav Sengupta 
Sent: Wednesday, August 26, 2020 5:49 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
Cc: user 
Subject: Re: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi
Can you try using emrfs?
Your study looks good best of luck.

Regards
Gourav

On Wed, 26 Aug 2020, 12:37 Rao, Abhishek (Nokia - IN/Bangalore), 
mailto:abhishek@nokia.com>> wrote:
Yeah… Not sure if I’m missing any configurations which is causing this issue. 
Any suggestions?

Thanks and Regards,
Abhishek

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Sent: Wednesday, August 26, 2020 2:35 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi,

So the results does not make sense.


Regards,
Gourav

On Wed, Aug 26, 2020 at 9:04 AM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi Gourav,

Yes. We’re using s3a.

Thanks and Regards,
Abhishek

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Sent: Wednesday, August 26, 2020 1:18 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi,

are you using s3a, which is not using EMRFS? In that case, these results does 
not make sense to me.

Regards,
Gourav Sengupta

On Mon, Aug 24, 2020 at 12:52 PM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi All,

We’re doing some performance comparisons between Spark querying data on HDFS vs 
Spark querying data on S3 (Ceph Object Store used for S3 storage) using 
standard TPC DS Queries. We are observing that Spark 3.0 with S3 is consuming 
significantly larger duration for some set of queries when compared with HDFS.
We also ran similar queries with Spark 2.4.5 querying data from S3 and we see 
that for these set of queries, time taken by Spark 2.4.5 is lesser compared to 
Spark 3.0 looks to be very strange.
Below are the details of 9 queries where Spark 3.0 is taking >5 times the 
duration for running queries on S3 when compared to Hadoop.

Environment Details:

  *   Spark running on Kubernetes
  *   TPC DS Scale Factor: 500 GB
  *   Hadoop 3.x
  *   Same CPU and memory used for all executions

Query
Spark 3.0 with S3 (Time in seconds)
Spark 3.0 with Hadoop (Time in seconds)


Spark 2.4.5 with S3
(Time in seconds)
Spark 3.0 HDFS vs S3 (Factor)
Spark 2.4.5 S3 vs Spark 3.0 S3 (Factor)
Table involved
9
880.129
106.109
147.65
8.294574
5.960914
store_sales
44
129.618
23.747
103.916
5.458289
1.247334
store_sales
58
142.113
20.996
33.936
6.768575
4.187677
store_sales
62
32.519
5.425
14.809
5.994286
2.195894
web_sales
76
138.765
20.73
49.892
6.693922
2.781308
store_sales
88
475.824
48.2
94.382
9.871867
5.04147
store_sales
90
53.896
6.804
18.11
7.921223
2.976035
web_sales
94
241.172
43.49
81.181
5.545459
2.970794
web_sales
96
67.059
10.396
15.993
6.450462
4.193022
store_sales

When we analysed it further, we see that all these queries are performing 
operations either on store_sales or web_sales tables and Spark 3 with S3 seems 
to be downloading much more data from storage when compared to Spark 3 with 
Hadoop or Spark 2.4.5 with S3 and this is resulting in more time for query 
completion. I’m attaching the screen shots of Driver UI for one such instance 
(Query 9) for reference.
Also attached the spark configurations (Spark 3.0) used for these tests.

We’re not sure why Spark 3.0 on S3 is having this behaviour. Any inputs on what 
we’re missing?

Thanks and Regards,
Abhishek


-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


RE: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

2020-08-26 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Yeah… Not sure if I’m missing any configurations which is causing this issue. 
Any suggestions?

Thanks and Regards,
Abhishek

From: Gourav Sengupta 
Sent: Wednesday, August 26, 2020 2:35 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
Cc: user@spark.apache.org
Subject: Re: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi,

So the results does not make sense.


Regards,
Gourav

On Wed, Aug 26, 2020 at 9:04 AM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi Gourav,

Yes. We’re using s3a.

Thanks and Regards,
Abhishek

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Sent: Wednesday, August 26, 2020 1:18 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi,

are you using s3a, which is not using EMRFS? In that case, these results does 
not make sense to me.

Regards,
Gourav Sengupta

On Mon, Aug 24, 2020 at 12:52 PM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi All,

We’re doing some performance comparisons between Spark querying data on HDFS vs 
Spark querying data on S3 (Ceph Object Store used for S3 storage) using 
standard TPC DS Queries. We are observing that Spark 3.0 with S3 is consuming 
significantly larger duration for some set of queries when compared with HDFS.
We also ran similar queries with Spark 2.4.5 querying data from S3 and we see 
that for these set of queries, time taken by Spark 2.4.5 is lesser compared to 
Spark 3.0 looks to be very strange.
Below are the details of 9 queries where Spark 3.0 is taking >5 times the 
duration for running queries on S3 when compared to Hadoop.

Environment Details:

  *   Spark running on Kubernetes
  *   TPC DS Scale Factor: 500 GB
  *   Hadoop 3.x
  *   Same CPU and memory used for all executions

Query
Spark 3.0 with S3 (Time in seconds)
Spark 3.0 with Hadoop (Time in seconds)


Spark 2.4.5 with S3
(Time in seconds)
Spark 3.0 HDFS vs S3 (Factor)
Spark 2.4.5 S3 vs Spark 3.0 S3 (Factor)
Table involved
9
880.129
106.109
147.65
8.294574
5.960914
store_sales
44
129.618
23.747
103.916
5.458289
1.247334
store_sales
58
142.113
20.996
33.936
6.768575
4.187677
store_sales
62
32.519
5.425
14.809
5.994286
2.195894
web_sales
76
138.765
20.73
49.892
6.693922
2.781308
store_sales
88
475.824
48.2
94.382
9.871867
5.04147
store_sales
90
53.896
6.804
18.11
7.921223
2.976035
web_sales
94
241.172
43.49
81.181
5.545459
2.970794
web_sales
96
67.059
10.396
15.993
6.450462
4.193022
store_sales

When we analysed it further, we see that all these queries are performing 
operations either on store_sales or web_sales tables and Spark 3 with S3 seems 
to be downloading much more data from storage when compared to Spark 3 with 
Hadoop or Spark 2.4.5 with S3 and this is resulting in more time for query 
completion. I’m attaching the screen shots of Driver UI for one such instance 
(Query 9) for reference.
Also attached the spark configurations (Spark 3.0) used for these tests.

We’re not sure why Spark 3.0 on S3 is having this behaviour. Any inputs on what 
we’re missing?

Thanks and Regards,
Abhishek


-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


RE: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

2020-08-26 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Gourav,

Yes. We’re using s3a.

Thanks and Regards,
Abhishek

From: Gourav Sengupta 
Sent: Wednesday, August 26, 2020 1:18 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
Cc: user@spark.apache.org
Subject: Re: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi,

are you using s3a, which is not using EMRFS? In that case, these results does 
not make sense to me.

Regards,
Gourav Sengupta

On Mon, Aug 24, 2020 at 12:52 PM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi All,

We’re doing some performance comparisons between Spark querying data on HDFS vs 
Spark querying data on S3 (Ceph Object Store used for S3 storage) using 
standard TPC DS Queries. We are observing that Spark 3.0 with S3 is consuming 
significantly larger duration for some set of queries when compared with HDFS.
We also ran similar queries with Spark 2.4.5 querying data from S3 and we see 
that for these set of queries, time taken by Spark 2.4.5 is lesser compared to 
Spark 3.0 looks to be very strange.
Below are the details of 9 queries where Spark 3.0 is taking >5 times the 
duration for running queries on S3 when compared to Hadoop.

Environment Details:

  *   Spark running on Kubernetes
  *   TPC DS Scale Factor: 500 GB
  *   Hadoop 3.x
  *   Same CPU and memory used for all executions

Query
Spark 3.0 with S3 (Time in seconds)
Spark 3.0 with Hadoop (Time in seconds)


Spark 2.4.5 with S3
(Time in seconds)
Spark 3.0 HDFS vs S3 (Factor)
Spark 2.4.5 S3 vs Spark 3.0 S3 (Factor)
Table involved
9
880.129
106.109
147.65
8.294574
5.960914
store_sales
44
129.618
23.747
103.916
5.458289
1.247334
store_sales
58
142.113
20.996
33.936
6.768575
4.187677
store_sales
62
32.519
5.425
14.809
5.994286
2.195894
web_sales
76
138.765
20.73
49.892
6.693922
2.781308
store_sales
88
475.824
48.2
94.382
9.871867
5.04147
store_sales
90
53.896
6.804
18.11
7.921223
2.976035
web_sales
94
241.172
43.49
81.181
5.545459
2.970794
web_sales
96
67.059
10.396
15.993
6.450462
4.193022
store_sales

When we analysed it further, we see that all these queries are performing 
operations either on store_sales or web_sales tables and Spark 3 with S3 seems 
to be downloading much more data from storage when compared to Spark 3 with 
Hadoop or Spark 2.4.5 with S3 and this is resulting in more time for query 
completion. I’m attaching the screen shots of Driver UI for one such instance 
(Query 9) for reference.
Also attached the spark configurations (Spark 3.0) used for these tests.

We’re not sure why Spark 3.0 on S3 is having this behaviour. Any inputs on what 
we’re missing?

Thanks and Regards,
Abhishek


-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


RE: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

2020-08-25 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Luca,

Thanks for sharing the feedback. We'll include these recommendations in our 
tests. However, we feel the issue that we're seeing right now is due to the 
difference in size of data downloaded from storage by the executors. In case of 
S3, executors are downloading almost 50 GB of data whereas in case of HDFS, it 
is only 4.5 GB.
Any idea why this difference is there?


Thanks and Regards,
Abhishek

From: Luca Canali 
Sent: Monday, August 24, 2020 7:18 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
Cc: user@spark.apache.org
Subject: RE: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi Abhishek,

Just a few ideas/comments on the topic:

When benchmarking/testing I find it useful to  collect a more complete view of 
resources usage and Spark metrics, beyond just measuring query elapsed time. 
Something like this:
https://github.com/cerndb/spark-dashboard

I'd rather not use dynamic allocation when benchmarking if possible, as it adds 
a layer of complexity when examining results.

If you suspect that reading from S3 vs. HDFS may play an important role on the 
performance you observe, you may want to drill down on that with a simple 
micro-benchmark, for example something like this (for Spark 3.0):

val df=spark.read.parquet("/TPCDS/tpcds_1500/store_sales")
df.write.format("noop").mode("overwrite").save

Best,
Luca

From: Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>>
Sent: Monday, August 24, 2020 13:50
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi All,

We're doing some performance comparisons between Spark querying data on HDFS vs 
Spark querying data on S3 (Ceph Object Store used for S3 storage) using 
standard TPC DS Queries. We are observing that Spark 3.0 with S3 is consuming 
significantly larger duration for some set of queries when compared with HDFS.
We also ran similar queries with Spark 2.4.5 querying data from S3 and we see 
that for these set of queries, time taken by Spark 2.4.5 is lesser compared to 
Spark 3.0 looks to be very strange.
Below are the details of 9 queries where Spark 3.0 is taking >5 times the 
duration for running queries on S3 when compared to Hadoop.

Environment Details:

  *   Spark running on Kubernetes
  *   TPC DS Scale Factor: 500 GB
  *   Hadoop 3.x
  *   Same CPU and memory used for all executions

Query
Spark 3.0 with S3 (Time in seconds)
Spark 3.0 with Hadoop (Time in seconds)


Spark 2.4.5 with S3
(Time in seconds)
Spark 3.0 HDFS vs S3 (Factor)
Spark 2.4.5 S3 vs Spark 3.0 S3 (Factor)
Table involved
9
880.129
106.109
147.65
8.294574
5.960914
store_sales
44
129.618
23.747
103.916
5.458289
1.247334
store_sales
58
142.113
20.996
33.936
6.768575
4.187677
store_sales
62
32.519
5.425
14.809
5.994286
2.195894
web_sales
76
138.765
20.73
49.892
6.693922
2.781308
store_sales
88
475.824
48.2
94.382
9.871867
5.04147
store_sales
90
53.896
6.804
18.11
7.921223
2.976035
web_sales
94
241.172
43.49
81.181
5.545459
2.970794
web_sales
96
67.059
10.396
15.993
6.450462
4.193022
store_sales

When we analysed it further, we see that all these queries are performing 
operations either on store_sales or web_sales tables and Spark 3 with S3 seems 
to be downloading much more data from storage when compared to Spark 3 with 
Hadoop or Spark 2.4.5 with S3 and this is resulting in more time for query 
completion. I'm attaching the screen shots of Driver UI for one such instance 
(Query 9) for reference.
Also attached the spark configurations (Spark 3.0) used for these tests.

We're not sure why Spark 3.0 on S3 is having this behaviour. Any inputs on what 
we're missing?

Thanks and Regards,
Abhishek



RE: Spark Thrift Server in Kubernetes deployment

2020-06-22 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi,

STS deployment on k8s is not supported out of the box.
We had done some minor changes in spark code to get Spark Thrift Server working 
on k8s.
Here is the PR that we had created.
https://github.com/apache/spark/pull/22433

Unfortunately, this could not be merged.

Thanks and Regards,
Abhishek

From: Subash K 
Sent: Monday, June 22, 2020 9:00 AM
To: user@spark.apache.org
Subject: Spark Thrift Server in Kubernetes deployment

Hi,

We are currently using Spark 2.4.4 with Spark Thrift Server (STS) to expose a 
JDBC interface to the reporting tools to generate report from Spark tables.

Now as we are analyzing on containerized deployment of Spark and STS, I would 
like to understand is STS deployment on Kubernetes is supported out of the box? 
Because we were not able to find any document link on how to configure and spin 
up the container for STS. Please help us on this.

Regards,
Subash Kunjupillai



RE: [External Sender] Spark Executor pod not getting created on kubernetes cluster

2019-10-07 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Manish,

Is this issue resolved? If not, please check the overlay network of your 
cluster once. We had faced similar issues when we had problems with overlay 
networking.
In our case, executor had spawned, but the communication with driver and 
executor had failed (due to issues with overlay network) and we were seeing 
similar logs.

One way to quickly check this is to quarantine all worker nodes except one. 
This way both driver and executor will be launched on same worker node. If 
driver/executor communication happens in this case, then it is confirmed that 
we have issue with overlay network.

Thanks and Regards,
Abhishek

From: manish gupta 
Sent: 01 October 2019 PM 09:20
To: Prudhvi Chennuru (CONT) 
Cc: user 
Subject: Re: [External Sender] Spark Executor pod not getting created on 
kubernetes cluster

Kube-api server logs are not enabled. I will enable and check and get back on 
this.

Regards
Manish Gupta

On Tue, Oct 1, 2019 at 9:05 PM Prudhvi Chennuru (CONT) 
mailto:prudhvi.chenn...@capitalone.com>> wrote:
If you are passing the service account for executors as spark property then 
executor will use the one you are passing not the default service account. Did 
you check the api server logs?

On Tue, Oct 1, 2019 at 11:07 AM manish gupta 
mailto:tomanishgupt...@gmail.com>> wrote:
While launching the driver pod I am passing the service account which has 
cluster role and has all the required permissions to create a new pod. So will 
driver pass the same details to API server while creating executor pod OR 
executors will be created with default service account?

Regards
Manish Gupta

On Tue, Oct 1, 2019 at 8:01 PM Prudhvi Chennuru (CONT) 
mailto:prudhvi.chenn...@capitalone.com>> wrote:
By default, executors use default service account in the namespace you are 
creating the driver and executors so i am guessing that executors don't have 
access to run on the cluster, if you check the kube-apisever logs you will know 
the issue
and try giving privileged access to default service account in the namespace 
you are creating the executors it should work.

On Tue, Oct 1, 2019 at 10:25 AM manish gupta 
mailto:tomanishgupt...@gmail.com>> wrote:
Hi Prudhvi

I can see this issue consistently. I am doing a POC wherein I am trying to 
create a dynamic spark cluster to run my job using spark submit on Kubernetes. 
On Minikube it works fine but on rbac enabled kubernetes it fails to launch 
executor pod. It is able to launch driver pod but not sure why it cannot launch 
executor pod even though it has ample resources.I dont see any error message in 
the logs apart from the warning message that I have provided above.
Not even a single executor pod is getting launched.

Regards
Manish Gupta

On Tue, Oct 1, 2019 at 6:31 PM Prudhvi Chennuru (CONT) 
mailto:prudhvi.chenn...@capitalone.com>> wrote:
Hi Manish,

Are you seeing this issue consistently or sporadically? and when 
you say executors are not launched not even a single executor created for that 
driver pod?

On Tue, Oct 1, 2019 at 1:43 AM manish gupta 
mailto:tomanishgupt...@gmail.com>> wrote:
Hi Team

I am trying to create a spark cluster on kubernetes with rbac enabled using 
spark submit job. I am using spark-2.4.1 version.
Spark submit is able to launch the driver pod by contacting Kubernetes API 
server but executor Pod is not getting launched. I can see the below warning 
message in the driver pod logs.

19/09/27 10:16:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
19/09/27 10:16:16 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources

I have faced this issue in standalone spark clusters and resolved it but not 
sure how to resolve this issue in kubernetes. I have not given any 
ResourceQuota configuration in kubernetes rbac yaml file and there is ample 
memory and cpu available for any new pod/container to be launched.

Any leads/pointers to resolve this issue would be of great help.

Thanks and Regards
Manish Gupta


--
Thanks,
Prudhvi Chennuru.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



--
Thanks,
Prudhvi Chennuru.


The information contained in this e-mail is confidentia

RE: web access to sparkUI on docker or k8s pods

2019-08-27 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi,

We have seen this issue when we tried to bringup the UI on custom ingress path 
(default ingress path “/” works). Do you also have similar configuration?
We tired setting spark.ui.proxyBase and spark.ui.reverseProxy but did not help.

As a workaround, we’re using ingress port (port on edge node) for now. There is 
option of using nodeport as well. That also works.

Thanks and Regards,
Abhishek

From: Yaniv Harpaz 
Sent: Tuesday, August 27, 2019 7:34 PM
To: user@spark.apache.org
Subject: web access to sparkUI on docker or k8s pods

hello guys,
when I launch driver pods or even when I use docker run with the spark image,
the spark master UI (8080) works great,
but the sparkUI (4040) is loading w/o the CSS

when I dig a bit deeper I see
"Refused to apply style from '' because its MIME type ('text/html') is not 
supported stylesheet MIME type, and strict MIME checking is enabled."

what am I missing here?
[https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]
Yaniv

Yaniv Harpaz
[ yaniv.harpaz at gmail.com<http://gmail.com> ]


Re: New Spark Datasource for Hive ACID tables

2019-07-27 Thread Abhishek Somani
I realised that the build instructions in the README.md were not very clear
due to some recent changes. I have updated those now.

Thanks,
Abhishek Somani

On Sun, Jul 28, 2019 at 7:53 AM naresh Goud 
wrote:

> Thanks Abhishek.
> I will check it out.
>
> Thank you,
> Naresh
>
> On Sat, Jul 27, 2019 at 9:21 PM Abhishek Somani <
> abhisheksoman...@gmail.com> wrote:
>
>> Hey Naresh,
>>
>> There is a `shaded-dependecies` project inside the root directory. You
>> need to go into that and build and publish that to local first.
>>
>> cd shaded-dependencies
>>> sbt clean publishLocal
>>>
>>
>> After that, come back out to the root directory and build that project.
>> The spark-acid-shaded-dependencies jar will now be found:
>>
>>> cd ..
>>> sbt assembly
>>
>>
>> This will create the jar which you can use.
>>
>> On another note, unless you are making changes in the code, you don't
>> need to build yourself as the jar is published in
>> https://spark-packages.org/package/qubole/spark-acid. So you can just
>> use it as:
>>
>> spark-shell --packages qubole:spark-acid:0.4.0-s_2.11
>>
>>
>> ...and it will be automatically fetched and used.
>>
>> Thanks,
>> Abhishek
>>
>>
>> On Sun, Jul 28, 2019 at 4:42 AM naresh Goud 
>> wrote:
>>
>>> It looks there is some internal dependency missing.
>>>
>>> libraryDependencies ++= Seq(
>>> "com.qubole" %% "spark-acid-shaded-dependencies" % "0.1"
>>> )
>>>
>>> How do we get it?
>>>
>>>
>>> Thank you,
>>> Naresh
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Naresh
>>> www.linkedin.com/in/naresh-dulam
>>> http://hadoopandspark.blogspot.com/
>>>
>>>
>>>
>>> On Sat, Jul 27, 2019 at 5:34 PM naresh Goud 
>>> wrote:
>>>
>>>> Hi Abhishek,
>>>>
>>>>
>>>> We are not able to build jar using git hub code with below error?
>>>>
>>>> Any others able to build jars? Is there anything else missing?
>>>>
>>>>
>>>>
>>>> Note: Unresolved dependencies path:
>>>> [warn]  com.qubole:spark-acid-shaded-dependencies_2.11:0.1
>>>> (C:\Data\Hadoop\spark-acid-master\build.sbt#L51-54)
>>>> [warn]+- com.qubole:spark-acid_2.11:0.4.0
>>>> sbt.ResolveException: unresolved dependency:
>>>> com.qubole#spark-acid-shaded-dependencies_2.11;0.1: not found
>>>> at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
>>>> at
>>>> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
>>>> at
>>>> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
>>>> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
>>>> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
>>>> at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
>>>> at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
>>>> at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
>>>> at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
>>>> at
>>>> xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
>>>> at
>>>> xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
>>>> at xsbt.boot.Using$.withResource(Using.scala:10)
>>>> at xsbt.boot.Using$.apply(Using.scala:9)
>>>> at
>>>> xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
>>>> at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
>>>> at xsbt.boot.Locks$.apply0(Locks.scala:31)
>>>> at xsbt.boot.Locks$.apply(Locks.scala:28)
>>>> at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
>>>> at sbt.IvySbt.withIvy(Ivy.scala:128)
>>>> at sbt.IvySbt.withIvy(Ivy.scala:125)
>>>> at sbt.IvySbt$Module.withModule(Ivy.scala:156)
>>>> at sbt.IvyActions$.updateEither(IvyActions.scala:168)
>>>> at
>>>> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1541)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
&g

Re: New Spark Datasource for Hive ACID tables

2019-07-27 Thread Abhishek Somani
Hey Naresh,

There is a `shaded-dependecies` project inside the root directory. You need
to go into that and build and publish that to local first.

cd shaded-dependencies
> sbt clean publishLocal
>

After that, come back out to the root directory and build that project. The
spark-acid-shaded-dependencies jar will now be found:

> cd ..
> sbt assembly


This will create the jar which you can use.

On another note, unless you are making changes in the code, you don't need
to build yourself as the jar is published in
https://spark-packages.org/package/qubole/spark-acid. So you can just use
it as:

spark-shell --packages qubole:spark-acid:0.4.0-s_2.11


...and it will be automatically fetched and used.

Thanks,
Abhishek


On Sun, Jul 28, 2019 at 4:42 AM naresh Goud 
wrote:

> It looks there is some internal dependency missing.
>
> libraryDependencies ++= Seq(
> "com.qubole" %% "spark-acid-shaded-dependencies" % "0.1"
> )
>
> How do we get it?
>
>
> Thank you,
> Naresh
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>
> On Sat, Jul 27, 2019 at 5:34 PM naresh Goud 
> wrote:
>
>> Hi Abhishek,
>>
>>
>> We are not able to build jar using git hub code with below error?
>>
>> Any others able to build jars? Is there anything else missing?
>>
>>
>>
>> Note: Unresolved dependencies path:
>> [warn]  com.qubole:spark-acid-shaded-dependencies_2.11:0.1
>> (C:\Data\Hadoop\spark-acid-master\build.sbt#L51-54)
>> [warn]+- com.qubole:spark-acid_2.11:0.4.0
>> sbt.ResolveException: unresolved dependency:
>> com.qubole#spark-acid-shaded-dependencies_2.11;0.1: not found
>> at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
>> at
>> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
>> at
>> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
>> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
>> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
>> at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
>> at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
>> at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
>> at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
>> at
>> xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
>> at
>> xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
>> at xsbt.boot.Using$.withResource(Using.scala:10)
>> at xsbt.boot.Using$.apply(Using.scala:9)
>> at
>> xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
>> at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
>> at xsbt.boot.Locks$.apply0(Locks.scala:31)
>> at xsbt.boot.Locks$.apply(Locks.scala:28)
>> at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
>> at sbt.IvySbt.withIvy(Ivy.scala:128)
>> at sbt.IvySbt.withIvy(Ivy.scala:125)
>> at sbt.IvySbt$Module.withModule(Ivy.scala:156)
>> at sbt.IvyActions$.updateEither(IvyActions.scala:168)
>> at
>> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1541)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Thanks,
>> Naresh
>> www.linkedin.com/in/naresh-dulam
>> http://hadoopandspark.blogspot.com/
>>
>>
>>
>> On Sat, Jul 27, 2019 at 3:25 PM Nicolas Paris 
>> wrote:
>>
>>> Congrats
>>>
>>> The read/write feature with hive3 is highly interesting
>>>
>>> On Fri, Jul 26, 2019 at 06:07:55PM +0530, Abhishek Somani wrote:
>>> > Hi All,
>>> >
>>> > We at Qubole have open sourced a datasource that will enable users to
>>> work on
>>> > their Hive ACID Transactional Tables using Spark.
>>> >
>>> > Github: https://github.com/qubole/spark-acid
>>> >
>>> > Hive ACID tables allow users to work on their data transactionally,
>>> and also
>>> > gives them the ability to Delete, Update and Merge data efficiently
>>> without
>>> > having to rewrite all of their data in a table, partition or file. We
>>> believe
>>> > that being able to work on these tables from Spark is a much desired
>>> value add,
>>> > as is also apparent in
>>> https://issues.apache.org/jira/browse/SPARK-15348 and
>>> > https://issues.apache.org/jira/browse/SPARK-16996 with multiple
>>> people looking
>>> > for it. Currently the datasource supports reading from these ACID
>>> tables only,
>>> > and we are working on adding the ability to write into these tables
>>> via Spark
>>> > as well.
>>> >
>>> > The datasource is also available as a spark package, and instructions
>>> on how to
>>> > use it are available on the Github page.
>>> >
>>> > We welcome your feedback and suggestions.
>>> >
>>> > Thanks,
>>> > Abhishek Somani
>>>
>>> --
>>> nicolas
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: New Spark Datasource for Hive ACID tables

2019-07-26 Thread Abhishek Somani
Hey Naresh,

Thanks for your question. Yes it will work!

Thanks,
Abhishek Somani

On Fri, Jul 26, 2019 at 7:08 PM naresh Goud 
wrote:

> Thanks Abhishek.
>
> Will it work on hive acid table which is not compacted ? i.e table having
> base and delta files?
>
> Let’s say hive acid table customer
>
> Create table customer(customer_id int, customer_name string,
> customer_email string) cluster by customer_id buckets 10 location
> ‘/test/customer’ tableproperties(transactional=true)
>
>
> And table hdfs path having below directories
>
> /test/customer/base_15234/
> /test/customer/delta_1234_456
>
>
> That means table having updates and major compaction not run.
>
> Will it spark reader works ?
>
>
> Thank you,
> Naresh
>
>
>
>
>
>
>
> On Fri, Jul 26, 2019 at 7:38 AM Abhishek Somani <
> abhisheksoman...@gmail.com> wrote:
>
>> Hi All,
>>
>> We at Qubole <https://www.qubole.com/> have open sourced a datasource
>> that will enable users to work on their Hive ACID Transactional Tables
>> <https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions>
>> using Spark.
>>
>> Github: https://github.com/qubole/spark-acid
>>
>> Hive ACID tables allow users to work on their data transactionally, and
>> also gives them the ability to Delete, Update and Merge data efficiently
>> without having to rewrite all of their data in a table, partition or file.
>> We believe that being able to work on these tables from Spark is a much
>> desired value add, as is also apparent in
>> https://issues.apache.org/jira/browse/SPARK-15348 and
>> https://issues.apache.org/jira/browse/SPARK-16996 with multiple people
>> looking for it. Currently the datasource supports reading from these ACID
>> tables only, and we are working on adding the ability to write into these
>> tables via Spark as well.
>>
>> The datasource is also available as a spark package, and instructions on
>> how to use it are available on the Github page
>> <https://github.com/qubole/spark-acid>.
>>
>> We welcome your feedback and suggestions.
>>
>> Thanks,
>> Abhishek Somani
>>
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


New Spark Datasource for Hive ACID tables

2019-07-26 Thread Abhishek Somani
Hi All,

We at Qubole <https://www.qubole.com/> have open sourced a datasource that
will enable users to work on their Hive ACID Transactional Tables
<https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions> using
Spark.

Github: https://github.com/qubole/spark-acid

Hive ACID tables allow users to work on their data transactionally, and
also gives them the ability to Delete, Update and Merge data efficiently
without having to rewrite all of their data in a table, partition or file.
We believe that being able to work on these tables from Spark is a much
desired value add, as is also apparent in
https://issues.apache.org/jira/browse/SPARK-15348 and
https://issues.apache.org/jira/browse/SPARK-16996 with multiple people
looking for it. Currently the datasource supports reading from these ACID
tables only, and we are working on adding the ability to write into these
tables via Spark as well.

The datasource is also available as a spark package, and instructions on
how to use it are available on the Github page
<https://github.com/qubole/spark-acid>.

We welcome your feedback and suggestions.

Thanks,
Abhishek Somani


RE: Spark on Kubernetes - log4j.properties not read

2019-06-10 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Dave,

As part of driver pod bringup, a configmap is created using all the spark 
configuration parameters (with name spark.properties) and mounted to 
/opt/spark/conf. So all the other files present in /opt/spark/conf will be 
overwritten.
Same is happening with the log4j.properties in this case. You could try to 
build the container by placing the log4j.properties at some other location and 
set the same in spark.driver.extraJavaOptions

Thanks and Regards,
Abhishek

From: Dave Jaffe 
Sent: Tuesday, June 11, 2019 6:45 AM
To: user@spark.apache.org
Subject: Spark on Kubernetes - log4j.properties not read

I am using Spark on Kubernetes from Spark 2.4.3. I have created a 
log4j.properties file in my local spark/conf directory and modified it so that 
the console (or, in the case of Kubernetes, the log) only shows warnings and 
higher (log4j.rootCategory=WARN, console). I then added the command
COPY conf /opt/spark/conf
to /root/spark/kubernetes/dockerfiles/spark/Dockerfile and built a new 
container.

However, when I run that under Kubernetes, the program runs successfully but 
/opt/spark/conf/log4j.properties is not used (I still see the INFO lines when I 
run kubectl logs ).

I have tried other things such as explicitly adding a –properties-file to my 
spark-submit command and even
--conf 
spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/conf/log4j.properties

My log4j.properties file is never seen.

How do I customize log4j.properties with Kubernetes?

Thanks, Dave Jaffe



Spark Metrics : Job Remains In "Running" State

2019-03-18 Thread Jain, Abhishek 3. (Nokia - IN/Bangalore)
Hi Team,

We are executing spark submit job by enabling the metrics(spark 2.4 on 
kubernetes) on the user defined port(say 45010). We have observed that the job 
is not going into "Completed" state even after it's 
completion(.stop()).
The pods for this spark submit job remain in "Running" state.  I am able to 
collect the metrics for both driver and executor/s on the defined port by using 
curl.

Below is the content of metrics.properties:
executor.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
executor.sink.csv.period=1
executor.sink.csv.directory=/tmp/
executor.sink.csv.unit=seconds
driver.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
driver.sink.csv.directory=/tmp/

*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
driver.sink.jmx.period=1
driver.sink.jmx.unit=seconds

# Enable JVM metrics source for all instances by class name
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource


Spark Submit Job:
export HADOOP_CONF_DIR=;sudo -E ./spark-submit --verbose 
--deploy-mode cluster --master  --conf spark.app.name= 
--conf spark.executor.instances=2 

Please let me know if it is the expected behavior ?

Regards,
Abhishek Jain


RE: Spark streaming filling the disk with logs

2019-02-14 Thread Jain, Abhishek 3. (Nokia - IN/Bangalore)
The properties provided earlier, will work for the standalone mode. For cluster 
mode, the below properties need to be added in the spark-submit:
--files "/log4j.properties" (to make log4j property file available 
for both driver and executor/s)

(to enable the extra java options for driver and executor/s)
--conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=file:/log4j.properties"
--conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/log4j.properties"

Regards,
Abhishek Jain

From: em...@yeikel.com 
Sent: Friday, February 15, 2019 7:32 AM
To: Jain, Abhishek 3. (Nokia - IN/Bangalore) ; 
'Deepak Sharma' 
Cc: 'spark users' 
Subject: RE: Spark streaming filling the disk with logs

I have a quick question about this configuration. Particularly this line :

log4j.appender.rolling.file=/var/log/spark/

Where is that path at? At the driver level or for each executor individually?

Thank you

From: Jain, Abhishek 3. (Nokia - IN/Bangalore) 
mailto:abhishek.3.j...@nokia.com>>
Sent: Thursday, February 14, 2019 7:48 AM
To: Deepak Sharma mailto:deepakmc...@gmail.com>>
Cc: spark users mailto:user@spark.apache.org>>
Subject: RE: Spark streaming filling the disk with logs

++
If you can afford loosing few old logs, then you can make use of rolling file 
Appender as well.

log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/
log4j.logger.org.apache.spark=

This means log4j will roll the log file by 50MB and keep only 5 recent files. 
These files are saved in /var/log/spark directory, with filename mentioned.

Regards,
Abhishek Jain

From: Jain, Abhishek 3. (Nokia - IN/Bangalore)
Sent: Thursday, February 14, 2019 5:58 PM
To: Deepak Sharma mailto:deepakmc...@gmail.com>>
Cc: spark users mailto:user@spark.apache.org>>
Subject: RE: Spark streaming filling the disk with logs

Hi Deepak,

The spark logging can be set for different purposes. Say for example if you 
want to control the spark-submit log, 
“log4j.logger.org.apache.spark.repl.Main=WARN/INFO/ERROR” can be set.

Similarly, to control third party logs:
log4j.logger.org.spark_project.jetty=, 
log4j.logger.org.apache.parquet= etc..

These properties can be set in the conf/log4j .properties file.

Hope this helps! 

Regards,
Abhishek Jain

From: Deepak Sharma mailto:deepakmc...@gmail.com>>
Sent: Thursday, February 14, 2019 12:10 PM
To: spark users mailto:user@spark.apache.org>>
Subject: Spark streaming filling the disk with logs

Hi All
I am running a spark streaming job with below configuration :

--conf "spark.executor.extraJavaOptions=-Droot.logger=WARN,console"

But it’s still filling the disk with info logs.
If the logging level is set to WARN at cluster level , then only the WARN logs 
are getting written but then it affects all the jobs .

Is there any way to get rid of INFO level of logging at spark streaming job 
level ?

Thanks
Deepak

--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


RE: Spark streaming filling the disk with logs

2019-02-14 Thread Jain, Abhishek 3. (Nokia - IN/Bangalore)
++
If you can afford loosing few old logs, then you can make use of rolling file 
Appender as well.

log4j.rootLogger=INFO, rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.maxFileSize=50MB
log4j.appender.rolling.maxBackupIndex=5
log4j.appender.rolling.file=/var/log/spark/
log4j.logger.org.apache.spark=

This means log4j will roll the log file by 50MB and keep only 5 recent files. 
These files are saved in /var/log/spark directory, with filename mentioned.

Regards,
Abhishek Jain

From: Jain, Abhishek 3. (Nokia - IN/Bangalore)
Sent: Thursday, February 14, 2019 5:58 PM
To: Deepak Sharma 
Cc: spark users 
Subject: RE: Spark streaming filling the disk with logs

Hi Deepak,

The spark logging can be set for different purposes. Say for example if you 
want to control the spark-submit log, 
“log4j.logger.org.apache.spark.repl.Main=WARN/INFO/ERROR” can be set.

Similarly, to control third party logs:
log4j.logger.org.spark_project.jetty=, 
log4j.logger.org.apache.parquet= etc..

These properties can be set in the conf/log4j .properties file.

Hope this helps! 

Regards,
Abhishek Jain

From: Deepak Sharma mailto:deepakmc...@gmail.com>>
Sent: Thursday, February 14, 2019 12:10 PM
To: spark users mailto:user@spark.apache.org>>
Subject: Spark streaming filling the disk with logs

Hi All
I am running a spark streaming job with below configuration :

--conf "spark.executor.extraJavaOptions=-Droot.logger=WARN,console"

But it’s still filling the disk with info logs.
If the logging level is set to WARN at cluster level , then only the WARN logs 
are getting written but then it affects all the jobs .

Is there any way to get rid of INFO level of logging at spark streaming job 
level ?

Thanks
Deepak

--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


RE: Spark streaming filling the disk with logs

2019-02-14 Thread Jain, Abhishek 3. (Nokia - IN/Bangalore)
Hi Deepak,

The spark logging can be set for different purposes. Say for example if you 
want to control the spark-submit log, 
“log4j.logger.org.apache.spark.repl.Main=WARN/INFO/ERROR” can be set.

Similarly, to control third party logs:
log4j.logger.org.spark_project.jetty=, 
log4j.logger.org.apache.parquet= etc..

These properties can be set in the conf/log4j .properties file.

Hope this helps! 

Regards,
Abhishek Jain

From: Deepak Sharma 
Sent: Thursday, February 14, 2019 12:10 PM
To: spark users 
Subject: Spark streaming filling the disk with logs

Hi All
I am running a spark streaming job with below configuration :

--conf "spark.executor.extraJavaOptions=-Droot.logger=WARN,console"

But it’s still filling the disk with info logs.
If the logging level is set to WARN at cluster level , then only the WARN logs 
are getting written but then it affects all the jobs .

Is there any way to get rid of INFO level of logging at spark streaming job 
level ?

Thanks
Deepak

--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


RE: Spark UI History server on Kubernetes

2019-01-23 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Lakshman,

We’ve set these 2 properties to bringup spark history server

spark.history.fs.logDirectory 
spark.history.ui.port 

We’re writing the logs to HDFS. In order to write logs, we’re setting following 
properties while submitting the spark job
spark.eventLog.enabled true
spark.eventLog.dir 

Thanks and Regards,
Abhishek

From: Battini Lakshman 
Sent: Wednesday, January 23, 2019 1:55 PM
To: Rao, Abhishek (Nokia - IN/Bangalore) 
Subject: Re: Spark UI History server on Kubernetes

HI Abhishek,

Thank you for your response. Could you please let me know the properties you 
configured for bringing up History Server and its UI.

Also, are you writing the logs to any directory on persistent storage, if yes, 
could you let me know the changes you did in Spark to write logs to that 
directory. Thanks!

Best Regards,
Lakshman Battini.

On Tue, Jan 22, 2019 at 10:53 PM Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>> wrote:
Hi,

We’ve setup spark-history service (based on spark 2.4) on K8S. UI works 
perfectly fine when running on NodePort. We’re facing some issues when on 
ingress.
Please let us know what kind of inputs do you need?

Thanks and Regards,
Abhishek

From: Battini Lakshman 
mailto:battini.laksh...@gmail.com>>
Sent: Tuesday, January 22, 2019 6:02 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark UI History server on Kubernetes

Hello,

We are running Spark 2.4 on Kubernetes cluster, able to access the Spark UI 
using "kubectl port-forward".

However, this spark UI contains currently running Spark application logs, we 
would like to maintain the 'completed' spark application logs as well. Could 
someone help us to setup 'Spark History server' on Kubernetes. Thanks!

Best Regards,
Lakshman Battini.


RE: Spark UI History server on Kubernetes

2019-01-22 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi,

We’ve setup spark-history service (based on spark 2.4) on K8S. UI works 
perfectly fine when running on NodePort. We’re facing some issues when on 
ingress.
Please let us know what kind of inputs do you need?

Thanks and Regards,
Abhishek

From: Battini Lakshman 
Sent: Tuesday, January 22, 2019 6:02 PM
To: user@spark.apache.org
Subject: Spark UI History server on Kubernetes

Hello,

We are running Spark 2.4 on Kubernetes cluster, able to access the Spark UI 
using "kubectl port-forward".

However, this spark UI contains currently running Spark application logs, we 
would like to maintain the 'completed' spark application logs as well. Could 
someone help us to setup 'Spark History server' on Kubernetes. Thanks!

Best Regards,
Lakshman Battini.


Re: [Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking

2018-07-23 Thread Abhishek Tripathi
Hello Dev!
Spark structured streaming job with simple window aggregation is leaking
file descriptor on kubernetes as cluster manager setup. It seems bug.
I am suing HDFS as FS for checkpointing.
Have anyone observed same?  Thanks for any help.

Please find more details in trailing email.


For more error log, please follow below Github gist:
https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566
Some details about file descriptor (lsof):
https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a
Code Snip:
https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74

Thanks.

Best Regards,
*Abhishek Tripath*


On Thu, Jul 19, 2018 at 10:02 AM Abhishek Tripathi 
wrote:

> Hello All!​​
> I am using spark 2.3.1 on kubernetes to run a structured streaming spark
> job which read stream from Kafka , perform some window aggregation and
> output sink to Kafka.
> After job running few hours(5-6 hours), the executor pods is getting
> crashed which is caused by "Too many open files in system".
> Digging in further, with "lsof" command, I can see there is a lot UNIX
> pipe getting opened.
>
> # lsof -p 14 | tail
> java 14 root *112u  a_inode   0,100  8838
> [eventpoll]
> java 14 root *113r FIFO0,9  0t0 252556158 pipe
> java 14 root *114w FIFO0,9  0t0 252556158 pipe
> java 14 root *115u  a_inode   0,100  8838
> [eventpoll]
> java 14 root *119r FIFO0,9  0t0 252552868 pipe
> java 14 root *120w FIFO0,9  0t0 252552868 pipe
> java 14 root *121u  a_inode   0,100  8838
> [eventpoll]
> java 14 root *131r FIFO0,9  0t0 252561014 pipe
> java 14 root *132w FIFO0,9  0t0 252561014 pipe
> java 14 root *133u  a_inode   0,100  8838
> [eventpoll]
>
> Total count of open fd is going up to 85K (increased hard ulimit) for
> each pod and once it hit the hard limit , executor pod is getting crashed.
> For shuffling I can think of it need more fd but in my case open fd count
> keep growing forever. Not sure how can I estimate how many fd will be
> adequate or there is a bug.
> With that uncertainty, I increased hard ulimit to large number as 85k but
> no luck.
> Seems like there is file descriptor leak.
>
> This spark job is running with native support of kubernetes as spark
> cluster manager. Currently using only two executor with 20 core(request)
> and 10GB (+6GB as memoryOverhead) of physical memory each.
>
> Have any one else seen the similar problem ?
> Thanks for any suggestion.
>
>
> Error details:
> Caused by: java.io.FileNotFoundException:
> /tmp/blockmgr-b530983c-39be-4c6d-95aa-3ad12a507843/24/temp_shuffle_bf774cf5-fadb-4ca7-a27b-a5be7b835eb6
> (Too many open files in system)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> For more error log, please follow below Github gist:
>
> https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566
>
>
> Some details about file descriptor (lsof):
> https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a
>
> Code Snip:
> https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74
>
> Platform  Details:
> Kubernets Version : 1.9.2
> Docker : 17.3.2
> Spark version:  2.3.1
> Kafka version: 2.11-0.10.2.1 (both topic has 20 partition and getting
> almost 5k records/s )
> Hadoop version (Using hdfs for check pointing)  : 2.7.2
>
> Thank you for any help.
>
> Best Regards,
> *Abhishek Tripathi*
>
>


[Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking

2018-07-19 Thread Abhishek Tripathi
Hello All!​​
I am using spark 2.3.1 on kubernetes to run a structured streaming spark
job which read stream from Kafka , perform some window aggregation and
output sink to Kafka.
After job running few hours(5-6 hours), the executor pods is getting
crashed which is caused by "Too many open files in system".
Digging in further, with "lsof" command, I can see there is a lot UNIX pipe
getting opened.

# lsof -p 14 | tail
java 14 root *112u  a_inode   0,100  8838
[eventpoll]
java 14 root *113r FIFO0,9  0t0 252556158 pipe
java 14 root *114w FIFO0,9  0t0 252556158 pipe
java 14 root *115u  a_inode   0,100  8838
[eventpoll]
java 14 root *119r FIFO0,9  0t0 252552868 pipe
java 14 root *120w FIFO0,9  0t0 252552868 pipe
java 14 root *121u  a_inode   0,100  8838
[eventpoll]
java 14 root *131r FIFO0,9  0t0 252561014 pipe
java 14 root *132w FIFO0,9  0t0 252561014 pipe
java 14 root *133u  a_inode   0,100  8838
[eventpoll]

Total count of open fd is going up to 85K (increased hard ulimit) for each
pod and once it hit the hard limit , executor pod is getting crashed.
For shuffling I can think of it need more fd but in my case open fd count
keep growing forever. Not sure how can I estimate how many fd will be
adequate or there is a bug.
With that uncertainty, I increased hard ulimit to large number as 85k but
no luck.
Seems like there is file descriptor leak.

This spark job is running with native support of kubernetes as spark
cluster manager. Currently using only two executor with 20 core(request)
and 10GB (+6GB as memoryOverhead) of physical memory each.

Have any one else seen the similar problem ?
Thanks for any suggestion.


Error details:
Caused by: java.io.FileNotFoundException:
/tmp/blockmgr-b530983c-39be-4c6d-95aa-3ad12a507843/24/temp_shuffle_bf774cf5-fadb-4ca7-a27b-a5be7b835eb6
(Too many open files in system)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at
org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

For more error log, please follow below Github gist:

https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566


Some details about file descriptor (lsof):
https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a

Code Snip:
https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74

Platform  Details:
Kubernets Version : 1.9.2
Docker : 17.3.2
Spark version:  2.3.1
Kafka version: 2.11-0.10.2.1 (both topic has 20 partition and getting
almost 5k records/s )
Hadoop version (Using hdfs for check pointing)  : 2.7.2

Thank you for any help.

Best Regards,
*Abhishek Tripathi*


Ingesting data in parallel across workers in Data Frame

2017-01-20 Thread Abhishek Gupta
I am trying to load data from the database into DataFrame using JDBC
driver.I want to get data into partitions the following document has the
nice explanation how to achieve so.
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

<https://docs.databricks.com/spark/latest/data-sources/sql-databases.html>The
problem I am facing that I don't have a numeric column which can be used
for achieving the partition.

Any help would be appreciated.


Thank You

--Abhishek


Re: Could not parse Master URL for Mesos on Spark 2.1.0

2017-01-09 Thread Abhishek Bhandari
Glad that you found it.
ᐧ

On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com>
wrote:

> Probably found it, it turns out that Mesos should be explicitly added
> while building Spark, I assumed I could use the old build command that I
> used for building Spark 2.0.0... Didn't see the two lines added in the
> documentation...
>
> Maybe these kind of changes could be added in the changelog under changes
> of behaviour or changes in the build process or something like that,
>
> kind regards,
> Richard
>
>
> On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not
>> parse Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.
>> Mesos is running fine (both the master as the slave, it's a single
>> machine configuration).
>>
>> I really don't understand why this is happening since the same
>> configuration but using a Spark 2.0.0 is running fine within Vagrant.
>> Could someone please help?
>>
>> thanks in advance,
>> Richard
>>
>>
>>
>>
>


-- 
*Abhishek J Bhandari*
Mobile No. +1 510 493 6205 (USA)
Mobile No. +91 96387 93021 (IND)
*R & D Department*
*Valent Software Inc. CA*
Email: *abhis...@valent-software.com <abhis...@valent-software.com>*


Insert a JavaPairDStream into multiple cassandra table on the basis of key.

2016-11-03 Thread Abhishek Anand
Hi All,

I have a JavaPairDStream. I want to insert this dstream into multiple
cassandra tables on the basis of key. One approach is to filter each key
and then insert it into cassandra table. But this would call filter
operation "n" times depending on the number of keys.

Is there any better approach to achieve this more quickly ?

Thanks
Abhi


MapWithState with large state

2016-10-31 Thread Abhishek Singh
Can it handle state that is large than what memory will hold?


Restful WS for Spark

2016-09-30 Thread ABHISHEK
Hello all,
Have you tried accessing Spark application using Restful  web-services?

I have requirement where remote user submit the  request with some data, it
should be sent to Spark and job should run in Hadoop cluster mode. Output
should be sent back to user.

Please share your  expertise.
Thanks,
Abhishek


Re: Spark Yarn Cluster with Reference File

2016-09-23 Thread ABHISHEK
I have tried with hdfs/tmp location but it didn't work. Same error.

On 23 Sep 2016 19:37, "Aditya" <aditya.calangut...@augmentiq.co.in> wrote:

> Hi Abhishek,
>
> Try below spark submit.
> spark-submit --master yarn --deploy-mode cluster  --files hdfs://
> abc.com:8020/tmp/abc.drl --class com.abc.StartMain
> abc-0.0.1-SNAPSHOT-jar-with-dependencies.jar abc.drl
> <http://abc.com:8020/tmp/abc.drl>
>
> On Friday 23 September 2016 07:29 PM, ABHISHEK wrote:
>
> Thanks for your response Aditya and Steve.
> Steve:
> I have tried specifying both /tmp/filename in hdfs and local path but it
> didn't work.
> You may be write that Kie session is configured  to  access files from
> Local path.
> I have attached code here for your reference and if you find some thing
> wrong, please help to correct it.
>
> Aditya:
> I have attached code here for reference. --File option will distributed
> reference file to all node but  Kie session is not able  to pickup it.
>
> Thanks,
> Abhishek
>
> On Fri, Sep 23, 2016 at 2:25 PM, Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>>
>> On 23 Sep 2016, at 08:33, ABHISHEK <abhi...@gmail.com> wrote:
>>
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: hdfs:/abc.com:8020/user/abhiet
>> c/abc.drl (No such file or directory)
>> at java.io.FileInputStream.open(Native Method)
>> at java.io.FileInputStream.(FileInputStream.java:146)
>> at org.drools.core.io.impl.FileSystemResource.getInputStream(Fi
>> leSystemResource.java:123)
>> at org.drools.compiler.kie.builder.impl.KieFileSystemImpl.
>> write(KieFileSystemImpl.java:58)
>>
>>
>>
>> Looks like this .KieFileSystemImpl class only works with local files, so
>> when it gets an HDFS path in it tries to open it and gets confused.
>>
>> you may need to write to a local FS temp file then copy it into HDFS
>>
>
>
>
>


Re: Spark Yarn Cluster with Reference File

2016-09-23 Thread ABHISHEK
Thanks for your response Aditya and Steve.
Steve:
I have tried specifying both /tmp/filename in hdfs and local path but it
didn't work.
You may be write that Kie session is configured  to  access files from
Local path.
I have attached code here for your reference and if you find some thing
wrong, please help to correct it.

Aditya:
I have attached code here for reference. --File option will distributed
reference file to all node but  Kie session is not able  to pickup it.

Thanks,
Abhishek

On Fri, Sep 23, 2016 at 2:25 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 23 Sep 2016, at 08:33, ABHISHEK <abhi...@gmail.com> wrote:
>
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: hdfs:/abc.com:8020/user/
> abhietc/abc.drl (No such file or directory)
> at java.io.FileInputStream.open(Native Method)
> at java.io.FileInputStream.(FileInputStream.java:146)
> at org.drools.core.io.impl.FileSystemResource.getInputStream(
> FileSystemResource.java:123)
> at org.drools.compiler.kie.builder.impl.KieFileSystemImpl.write(
> KieFileSystemImpl.java:58)
>
>
>
> Looks like this .KieFileSystemImpl class only works with local files, so
> when it gets an HDFS path in it tries to open it and gets confused.
>
> you may need to write to a local FS temp file then copy it into HDFS
>
object Mymain {
  def main(args: Array[String]): Unit = {

   // @  abhishek 
   //val fileName = "./abc.drl"   //  code works if I run app in local mode 
   
  
val fileName = args(0)
val conf = new SparkConf().setAppName("LocalStreaming")
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(1)) 
val brokers = "190.51.231.132:9092"
val groupId = "testgroup"
val offsetReset = "smallest"
val pollTimeout = "1000"
val topics = "NorthPole"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
  "spark.kafka.poll.time" -> pollTimeout)

val detailTable = "emp1 "
val summaryTable= "emp2"
val confHBase = HBaseConfiguration.create()
confHBase.set("hbase.zookeeper.quorum", "190.51.231.132")
confHBase.set("hbase.zookeeper.property.clientPort", "2181")
confHBase.set("hbase.master", "190.51.231.132:6")
val emp_detail_config = Job.getInstance(confHBase)
val emp_summary_config = Job.getInstance(confHBase)
 
emp_detail_config.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
"emp_detail");

emp_detail_config.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  
emp_summary_config.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
"emp_summary")

emp_summary_config.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)

messages.foreachRDD(rdd => {
  if (rdd.count > 0) {

val messageRDD: RDD[String] = rdd.map { y => y._2 }
val inputJsonObjectRDD = messageRDD.map(row => 
Utilities.convertToJsonObject(row))

inputJsonObjectRDD.map(row => 
BuildJsonStrctures.buildTaxCalcDetail(row)).saveAsNewAPIHadoopDataset(emp_detail_config.getConfiguration)

val inputObjectRDD = messageRDD.map(row => 
Utilities.convertToSubmissionJavaObject(row))
val executedObjectRDD = inputObjectRDD.mapPartitions(row => 
KieSessionFactory.execute(row, fileName.toString()))
val executedJsonRDD = executedObjectRDD.map(row => 
Utilities.convertToSubmissionJSonString(row))
  .map(row => Utilities.convertToJsonObject(row))

val summaryInputJsonRDD = executedObjectRDD

summaryInputJsonRDD.map(row => 
BuildJsonStrctures.buildSummary2(row)).saveAsNewAPIHadoopDataset(emp_summary_config.getConfiguration)

  } else {
println("No message received") //this works only in master local mode
  }
})
ssc.start()
ssc.awaitTermination()

  }
}
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Spark Yarn Cluster with Reference File

2016-09-23 Thread ABHISHEK
Hello there,

I have Spark Application which refer to an external file ‘abc.drl’ and
having unstructured data.
Application is able to find this reference file if I  run app in Local mode
but in Yarn with Cluster mode, it is not able to  find the file in the
specified path.
I tried with both local and hdfs path with –-files option but it didn’t
work.


What is working ?
1. Current  Spark Application runs fine if I run it in Local mode as
mentioned below.
In below command   file path is local path not HDFS.
spark-submit --master local[*]  --class "com.abc.StartMain"
abc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /home/abhietc/abc/abc.drl

3. I want to run this Spark application using Yarn with cluster mode.
For that, I used below command but application is not able to find the path
for the reference file abc.drl.I tried giving both local and HDFS path but
didn’t work.

spark-submit --master yarn --deploy-mode cluster  --files
/home/abhietc/abc/abc.drl --class com.abc.StartMain
abc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /home/abhietc/abc/abc.drl

spark-submit --master yarn --deploy-mode cluster  --files hdfs://
abhietc.com:8020/user/abhietc/abc.drl --class com.abc.StartMain
abc-0.0.1-SNAPSHOT-jar-with-dependencies.jar hdfs://
abhietc.com:8020/user/abhietc/abc.drl

spark-submit --master yarn --deploy-mode cluster  --files hdfs://
abc.com:8020/tmp/abc.drl --class com.abc.StartMain
abc-0.0.1-SNAPSHOT-jar-with-dependencies.jar hdfs://abc.com:8020/tmp/abc.drl


Error Messages:
Surprising we are not doing any Write operation on reference file but still
log shows that application is trying to write to file instead reading the
file.
Also log shows File not found exception for both HDFS and Local path.
-
16/09/20 14:49:50 ERROR scheduler.JobScheduler: Error running job streaming
job 1474363176000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
1.0 (TID 4, abc.com): java.lang.RuntimeException: Unable to write Resource:
FileResource[file=hdfs:/abc.com:8020/user/abhietc/abc.drl]
at
org.drools.compiler.kie.builder.impl.KieFileSystemImpl.write(KieFileSystemImpl.java:71)
at
com.hmrc.taxcalculator.KieSessionFactory$.getNewSession(KieSessionFactory.scala:49)
at
com.hmrc.taxcalculator.KieSessionFactory$.getKieSession(KieSessionFactory.scala:21)
at
com.hmrc.taxcalculator.KieSessionFactory$.execute(KieSessionFactory.scala:27)
at
com.abc.StartMain$$anonfun$main$1$$anonfun$4.apply(TaxCalculatorMain.scala:124)
at
com.abc.StartMain$$anonfun$main$1$$anonfun$4.apply(TaxCalculatorMain.scala:124)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: hdfs:/
abc.com:8020/user/abhietc/abc.drl (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at
org.drools.core.io.impl.FileSystemResource.getInputStream(FileSystemResource.java:123)
at
org.drools.compiler.kie.builder.impl.KieFileSystemImpl.write(KieFileSystemImpl.java:58)
... 19 more
--
Cheers,
Abhishek


Re: Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
Hi Ayan,

How will I get column wise distinct items using this approach ?

On Mon, Sep 19, 2016 at 3:31 PM, ayan guha <guha.a...@gmail.com> wrote:

> Create an array out of cilumns, convert to Dataframe,
> explode,distinct,write.
> On 19 Sep 2016 19:11, "Saurav Sinha" <sauravsinh...@gmail.com> wrote:
>
>> You can use distinct over you data frame or rdd
>>
>> rdd.distinct
>>
>> It will give you distinct across your row.
>>
>> On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand <abhis.anan...@gmail.com>
>> wrote:
>>
>>> I have an rdd which contains 14 different columns. I need to find the
>>> distinct across all the columns of rdd and write it to hdfs.
>>>
>>> How can I acheive this ?
>>>
>>> Is there any distributed data structure that I can use and keep on
>>> updating it as I traverse the new rows ?
>>>
>>> Regards,
>>> Abhi
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>


Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
I have an rdd which contains 14 different columns. I need to find the
distinct across all the columns of rdd and write it to hdfs.

How can I acheive this ?

Is there any distributed data structure that I can use and keep on updating
it as I traverse the new rows ?

Regards,
Abhi


UNSUBSCRIBE

2016-08-09 Thread abhishek singh



Relative path in absolute URI

2016-08-03 Thread Abhishek Ranjan
Hi All,

I am trying to use spark 2.0 with hadoop-hdfs 2.7.2 with scala 2.11.  I am
not able to use below API to load file from my local hdfs.

spark.sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(path)
  .show()

java.net.URISyntaxException: Relative path in absolute URI:
file:C:/test/sampleApp/spark-warehouse

Complete details are available here:
http://stackoverflow.com/questions/38687811/not-able-to-load-file-from-hdfs-in-spark-dataframe

Somehow it is picking up incorrect path. Did any one encountered similar
problem with spark 2.0?

With Thanks,
Abhishek


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Thanks Nihed.

I was able to do this in exactly the same way.


Cheers!!
Abhi

On Mon, Jul 18, 2016 at 5:56 PM, nihed mbarek <nihe...@gmail.com> wrote:

> and if we have this static method
> df.show();
> Column c = concatFunction(df, "l1", "firstname,lastname");
> df.select(c).show();
>
> with this code :
> Column concatFunction(DataFrame df, String fieldName, String columns) {
> String[] array = columns.split(",");
> Column[] concatColumns = new Column[array.length];
> for (int i = 0; i < concatColumns.length; i++) {
> concatColumns[i]=df.col(array[i]);
> }
>
> return functions.concat(concatColumns).alias(fieldName);
> }
>
>
>
> On Mon, Jul 18, 2016 at 2:14 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi Nihed,
>>
>> Thanks for the reply.
>>
>> I am looking for something like this :
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1")));
>>
>>
>> Here I have to give C0 and C1 columns, I am looking to write a generic
>> function that concatenates the columns depending on input columns.
>>
>> like if I have something
>> String str = "C0,C1,C2"
>>
>> Then it should work as
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));
>>
>>
>>
>> Thanks,
>> Abhi
>>
>> On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek <nihe...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I just wrote this code to help you. Is it what you need ??
>>>
>>>
>>> SparkConf conf = new
>>> SparkConf().setAppName("hello").setMaster("local");
>>> JavaSparkContext sc = new JavaSparkContext(conf);
>>> SQLContext sqlContext = new SQLContext(sc);
>>> List persons = new ArrayList<>();
>>> persons.add(new Person("nihed", "mbarek", "nihed.com"));
>>> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>>>
>>> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>>>
>>> df.show();
>>> final String[] columns = df.columns();
>>> Column[] selectColumns = new Column[columns.length + 1];
>>> for (int i = 0; i < columns.length; i++) {
>>> selectColumns[i]=df.col(columns[i]);
>>> }
>>>
>>>
>>> selectColumns[columns.length]=functions.concat(df.col("firstname"),
>>> df.col("lastname"));
>>>
>>> df.select(selectColumns).show();
>>>   ---
>>> public static class Person {
>>>
>>> private String firstname;
>>> private String lastname;
>>> private String address;
>>> }
>>>
>>>
>>>
>>> Regards,
>>>
>>> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>>>
>>>> I need to create interaction variables to be taken as input for my
>>>> program.
>>>>
>>>> For eg -
>>>>
>>>> I need to create I1 as concatenation of C0,C3,C5
>>>>
>>>> Similarly, I2  = concat(C4,C5)
>>>>
>>>> and so on ..
>>>>
>>>>
>>>> How can I achieve this in my Java code for concatenation of any columns
>>>> given input by the user.
>>>>
>>>> Thanks,
>>>> Abhi
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> M'BAREK Med Nihed,
>>> Fedora Ambassador, TUNISIA, Northern Africa
>>> http://www.nihed.com
>>>
>>> <http://tn.linkedin.com/in/nihed>
>>>
>>>
>>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi Nihed,

Thanks for the reply.

I am looking for something like this :

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1")));


Here I have to give C0 and C1 columns, I am looking to write a generic
function that concatenates the columns depending on input columns.

like if I have something
String str = "C0,C1,C2"

Then it should work as

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));



Thanks,
Abhi

On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek <nihe...@gmail.com> wrote:

> Hi,
>
>
> I just wrote this code to help you. Is it what you need ??
>
>
> SparkConf conf = new
> SparkConf().setAppName("hello").setMaster("local");
> JavaSparkContext sc = new JavaSparkContext(conf);
> SQLContext sqlContext = new SQLContext(sc);
> List persons = new ArrayList<>();
> persons.add(new Person("nihed", "mbarek", "nihed.com"));
> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>
> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>
> df.show();
> final String[] columns = df.columns();
> Column[] selectColumns = new Column[columns.length + 1];
> for (int i = 0; i < columns.length; i++) {
> selectColumns[i]=df.col(columns[i]);
> }
>
>
> selectColumns[columns.length]=functions.concat(df.col("firstname"),
> df.col("lastname"));
>
> df.select(selectColumns).show();
>   ---
> public static class Person {
>
> private String firstname;
> private String lastname;
> private String address;
> }
>
>
>
> Regards,
>
> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>
>> I need to create interaction variables to be taken as input for my
>> program.
>>
>> For eg -
>>
>> I need to create I1 as concatenation of C0,C3,C5
>>
>> Similarly, I2  = concat(C4,C5)
>>
>> and so on ..
>>
>>
>> How can I achieve this in my Java code for concatenation of any columns
>> given input by the user.
>>
>> Thanks,
>> Abhi
>>
>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi,

I have a dataframe say having C0,C1,C2 and so on as columns.

I need to create interaction variables to be taken as input for my program.

For eg -

I need to create I1 as concatenation of C0,C3,C5

Similarly, I2  = concat(C4,C5)

and so on ..


How can I achieve this in my Java code for concatenation of any columns
given input by the user.

Thanks,
Abhi


Change spark dataframe to LabeledPoint in Java

2016-06-30 Thread Abhishek Anand
Hi ,

I have a dataframe which i want to convert to labeled point.

DataFrame labeleddf = model.transform(newdf).select("label","features");

How can I convert this to a LabeledPoint to use in my Logistic Regression
model.

I could do this in scala using
val trainData = labeleddf.map(row =>
LabeledPoint(row.getDouble(0), row(1).asInstanceOf[Vector])).cache()


How to achieve the same in Java,

Thanks,
Abhi


Re: spark.hadoop.dfs.replication parameter not working for kafka-spark streaming

2016-05-31 Thread Abhishek Anand
I also tried


jsc.sparkContext().sc().hadoopConfiguration().set("dfs.replication", "2")

But, still its not working.

Any ideas why its not working ?


Abhi

On Tue, May 31, 2016 at 4:03 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> My spark streaming checkpoint directory is being written to HDFS with
> default replication factor of 3.
>
> In my streaming application where I am listening from kafka and setting
> the dfs.replication = 2 as below the files are still being written with
> replication factor=3
>
> SparkConf sparkConfig = new
> SparkConf().setMaster("mymaster").set("spark.hadoop.dfs.replication", "2");
>
> Is there anything else that I need to do ??
>
>
> Thanks !!!
> Abhi
>


spark.hadoop.dfs.replication parameter not working for kafka-spark streaming

2016-05-31 Thread Abhishek Anand
My spark streaming checkpoint directory is being written to HDFS with
default replication factor of 3.

In my streaming application where I am listening from kafka and setting the
dfs.replication = 2 as below the files are still being written with
replication factor=3

SparkConf sparkConfig = new
SparkConf().setMaster("mymaster").set("spark.hadoop.dfs.replication", "2");

Is there anything else that I need to do ??


Thanks !!!
Abhi


Re: Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Thanks Yanbo.

So, you mean that if I have a variable which is of type double but I want
to treat it like String in my model I just have to cast those columns into
string and simply run the glm model. String columns will be directly
one-hot encoded by the glm provided by sparkR ?

Just wanted to clarify as in R we need to apply as.factor for categorical
variables.

val dfNew = df.withColumn("C0",df.col("C0").cast("String"))


Abhi !!

On Mon, May 30, 2016 at 2:58 PM, Yanbo Liang <yblia...@gmail.com> wrote:

> Hi Abhi,
>
> In SparkR glm, category features (columns of type string) will be one-hot
> encoded automatically.
> So pre-processing like `as.factor` is not necessary, you can directly feed
> your data to the model training.
>
> Thanks
> Yanbo
>
> 2016-05-30 2:06 GMT-07:00 Abhishek Anand <abhis.anan...@gmail.com>:
>
>> Hi ,
>>
>> I want to run glm variant of sparkR for my data that is there in a csv
>> file.
>>
>> I see that the glm function in sparkR takes a spark dataframe as input.
>>
>> Now, when I read a file from csv and create a spark dataframe, how could
>> I take care of the factor variables/columns in my data ?
>>
>> Do I need to convert it to a R dataframe, convert to factor using
>> as.factor and create spark dataframe and run glm over it ?
>>
>> But, running as.factor over big dataset is not possible.
>>
>> Please suggest what is the best way to acheive this ?
>>
>> What pre-processing should be done, and what is the best way to achieve
>> it  ?
>>
>>
>> Thanks,
>> Abhi
>>
>
>


Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Hi ,

I want to run glm variant of sparkR for my data that is there in a csv file.

I see that the glm function in sparkR takes a spark dataframe as input.

Now, when I read a file from csv and create a spark dataframe, how could I
take care of the factor variables/columns in my data ?

Do I need to convert it to a R dataframe, convert to factor using as.factor
and create spark dataframe and run glm over it ?

But, running as.factor over big dataset is not possible.

Please suggest what is the best way to acheive this ?

What pre-processing should be done, and what is the best way to achieve it
 ?


Thanks,
Abhi


RE: GraphX Java API

2016-05-29 Thread Kumar, Abhishek (US - Bengaluru)
Hey,
·   I see some graphx packages listed here:
http://spark.apache.org/docs/latest/api/java/index.html
·   
org.apache.spark.graphx<http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/package-frame.html>
·   
org.apache.spark.graphx.impl<http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/impl/package-frame.html>
·   
org.apache.spark.graphx.lib<http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/lib/package-frame.html>
·   
org.apache.spark.graphx.util<http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/util/package-frame.html>
Aren’t they meant to be used with JAVA?
Thanks

From: Santoshakhilesh [mailto:santosh.akhil...@huawei.com]
Sent: Friday, May 27, 2016 4:52 PM
To: Kumar, Abhishek (US - Bengaluru) <abhishekkuma...@deloitte.com>; 
user@spark.apache.org
Subject: RE: GraphX Java API

GraphX APis are available only in Scala. If you need to use GraphX you need to 
switch to Scala.

From: Kumar, Abhishek (US - Bengaluru) [mailto:abhishekkuma...@deloitte.com]
Sent: 27 May 2016 19:59
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: GraphX Java API

Hi,

We are trying to consume the Java API for GraphX, but there is no documentation 
available online on the usage or examples. It would be great if we could get 
some examples in Java.

Thanks and regards,

Abhishek Kumar






This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

v.E.1









GraphX Java API

2016-05-27 Thread Kumar, Abhishek (US - Bengaluru)
Hi,

We are trying to consume the Java API for GraphX, but there is no documentation 
available online on the usage or examples. It would be great if we could get 
some examples in Java.

Thanks and regards,

Abhishek Kumar
Products & Services | iLab
Deloitte Consulting LLP
Block ‘C’, Divyasree Technopolis, Survey No.: 123 & 132/2, Yemlur Post, Yemlur, 
Bengaluru – 560037, Karnataka, India
Mobile: +91 7736795770
abhishekkuma...@deloitte.com<mailto:abhishekkuma...@deloitte.com> | 
www.deloitte.com<http://www.deloitte.com/>

Please consider the environment before printing.





This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

v.E.1









Calculating log-loss for the trained model in Spark ML

2016-05-03 Thread Abhishek Anand
I am building a ML pipeline for logistic regression.

val lr = new LogisticRegression()

lr.setMaxIter(100).setRegParam(0.001)

val pipeline = new
Pipeline().setStages(Array(geoDimEncoder,clientTypeEncoder,
   devTypeDimIdEncoder,pubClientIdEncoder,tmpltIdEncoder,
   hourEncoder,assembler,lr))

val model = pipeline.fit(trainingDF)

Now, when the model is trained, I want to see the value
the probabilities for the training set and compute certain
validation parameters like log-loss. But, I am unable to find
this using "model".

The only thing I could find is

model.transform(testDF).select()

Cannot I get the metrics using the trained set for training set validation ?

Thanks !!


Re: removing header from csv file

2016-05-03 Thread Abhishek Anand
You can use this function to remove the header from your dataset(applicable
to RDD)

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
}


Abhi

On Wed, Apr 27, 2016 at 12:55 PM, Marco Mistroni 
wrote:

> If u r using Scala api you can do
> Myrdd.zipwithindex.filter(_._2 >0).map(_._1)
>
> Maybe a little bit complicated but will do the trick
> As per spark CSV, you will get back a data frame which you can reconduct
> to rdd. .
> Hth
> Marco
> On 27 Apr 2016 6:59 am, "nihed mbarek"  wrote:
>
>> You can add a filter with string that you are sure available only in the
>> header
>>
>> Le mercredi 27 avril 2016, Divya Gehlot  a
>> écrit :
>>
>>> yes you can remove the headers by removing the first row
>>>
>>> can first() or head() to do that
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>> On 27 April 2016 at 13:24, Ashutosh Kumar 
>>> wrote:
>>>
 I see there is a library spark-csv which can be used for removing
 header and processing of csv files. But it seems it works with sqlcontext
 only. Is there a way to remove header from csv files without sqlcontext ?

 Thanks
 Ashutosh

>>>
>>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>


Clear Threshold in Logistic Regression ML Pipeline

2016-05-03 Thread Abhishek Anand
Hi All,

I am trying to build a logistic regression pipeline in ML.

How can I clear the threshold which by default is 0.5. In mllib I am able
to clear the threshold to get the raw predictions using
model.clearThreshold() function.


Regards,
Abhi


RE: removing header from csv file

2016-04-27 Thread Mishra, Abhishek
You should be doing something like this:


data = sc.textFile('file:///path1/path/test1.csv')
header = data.first() #extract header
#print header
data = data.filter(lambda x:x !=header)
#print data
Hope it helps.

Sincerely,
Abhishek
+91-7259028700

From: nihed mbarek [mailto:nihe...@gmail.com]
Sent: Wednesday, April 27, 2016 11:29 AM
To: Divya Gehlot
Cc: Ashutosh Kumar; user @spark
Subject: Re: removing header from csv file

You can add a filter with string that you are sure available only in the header

Le mercredi 27 avril 2016, Divya Gehlot 
<divya.htco...@gmail.com<mailto:divya.htco...@gmail.com>> a écrit :
yes you can remove the headers by removing the first row

can first() or head() to do that


Thanks,
Divya

On 27 April 2016 at 13:24, Ashutosh Kumar 
<kmr.ashutos...@gmail.com<javascript:_e(%7B%7D,'cvml','kmr.ashutos...@gmail.com');>>
 wrote:
I see there is a library spark-csv which can be used for removing header and 
processing of csv files. But it seems it works with sqlcontext only. Is there a 
way to remove header from csv files without sqlcontext ?
Thanks
Ashutosh



--

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com

[http://www.linkedin.com/img/webpromo/btn_myprofile_160x33_fr_FR.png]<http://tn.linkedin.com/in/nihed>



Fwd: Facing Unusual Behavior with the executors in spark streaming

2016-04-05 Thread Abhishek Anand
Hi ,

Needed inputs for a couple of issue that I am facing in my production
environment.

I am using spark version 1.4.0 spark streaming.

1) It so happens that the worker is lost on a machine and the executor
still shows up in the executor's tab in the UI.

Even when I kill a worker using kill -9 command the worker and executor
both dies on that machine but executor still shows up in the executors tab
on the UI. The number of active tasks sometimes shows negative on that
executor and my job keeps failing with following exception.

This usually happens when a job is running. When no computation is taking
place on the cluster i.e suppose a 1 min batch gets completed in 20 secs
and I kill the worker then executor entry is also gone from the UI but when
I kill the worker when a job is still running I run into this issue always.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more



 When I relaunch the worker new executors are added but the dead one's
entry is still there until the application is killed.

 2) Another issue is when the disk becomes full on one of the workers, the
executor becomes unresponsive and the job stucks at a particular stage. The
exception that I can see in the executor logs is


 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)


As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are
recomputed. But, is it really the case that when the space is full on a
machine then my application gets stuck ?




This is really becoming a bottleneck and leads to unstability of my
production stack.

Please share your insights on this.


Thanks,
Abhi


Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-04-01 Thread Abhishek Anand
Hi Ted,

Any thoughts on this ???

I am getting the same kind of error when I kill a worker on one of the
machines.
Even after killing the worker using kill -9 command, the executor shows up
on the spark UI with negative active tasks.

All the tasks on that worker starts to fail with the following exception.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more




Cheers !!
Abhi

On Fri, Apr 1, 2016 at 9:04 AM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> This is what I am getting in the executor logs
>
> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
> reverting partial writes to file
> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
> java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:315)
> at
> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)
>
>
>
> It happens every time the disk is full.
>
> On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you show the stack trace ?
>>
>> The log message came from
>> DiskBlockObjectWriter#revertPartialWritesAndClose().
>> Unfortunately, the method doesn't throw exception, making it a bit hard
>> for caller to know of the disk full condition.
>>
>> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>>
>>> Hi,
>>>
>>> Why is it so that when my disk space is full on one of the workers then
>>> the executor on that worker becomes unresponsive and the jobs on that
>>> worker fails with the exception
>>>
>>>
>>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>>> reverting partial writes to file
>>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>>> java.io.IOException: No space left on device
>>>
>>>
>>> This is leading to my job getting stuck.
>>>
>>> As a workaround I have to kill the executor, clear the space on disk and
>>> new executor  relaunched by the worker and the failed stages are recomputed.
>>>
>>>
>>> How can I get rid of this problem i.e why my job get stuck on disk full
>>> issue on one of the workers ?
>>>
>>>
>>> Cheers !!!
>>> Abhi
>>>
>>>
>>
>


Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-03-31 Thread Abhishek Anand
This is what I am getting in the executor logs

16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)



It happens every time the disk is full.

On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you show the stack trace ?
>
> The log message came from
> DiskBlockObjectWriter#revertPartialWritesAndClose().
> Unfortunately, the method doesn't throw exception, making it a bit hard
> for caller to know of the disk full condition.
>
> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>>
>> Hi,
>>
>> Why is it so that when my disk space is full on one of the workers then
>> the executor on that worker becomes unresponsive and the jobs on that
>> worker fails with the exception
>>
>>
>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>> reverting partial writes to file
>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>> java.io.IOException: No space left on device
>>
>>
>> This is leading to my job getting stuck.
>>
>> As a workaround I have to kill the executor, clear the space on disk and
>> new executor  relaunched by the worker and the failed stages are recomputed.
>>
>>
>> How can I get rid of this problem i.e why my job get stuck on disk full
>> issue on one of the workers ?
>>
>>
>> Cheers !!!
>> Abhi
>>
>>
>


Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-03-31 Thread Abhishek Anand
Hi,

Why is it so that when my disk space is full on one of the workers then the
executor on that worker becomes unresponsive and the jobs on that worker
fails with the exception


16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device


This is leading to my job getting stuck.

As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are recomputed.


How can I get rid of this problem i.e why my job get stuck on disk full
issue on one of the workers ?


Cheers !!!
Abhi


Output the data to external database at particular time in spark streaming

2016-03-08 Thread Abhishek Anand
I have a spark streaming job where I am aggregating the data by doing
reduceByKeyAndWindow with inverse function.

I am keeping the data in memory for upto 2 hours and In order to output the
reduced data to an external storage I conditionally need to puke the data
to DB say at every 15th minute of the each hour.

How can this be achieved.


Regards,
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-29 Thread Abhishek Anand
Hi Ryan,

Its not working even after removing the reduceByKey.

So, basically I am doing the following
- reading from kafka
- flatmap inside transform
- mapWithState
- rdd.count on output of mapWithState

But to my surprise still dont see checkpointing taking place.

Is there any restriction to the type of operation that we can perform
inside mapWithState ?

Really need to resolve this one as currently if my application is restarted
from checkpoint it has to repartition 120 previous stages which takes hell
lot of time.

Thanks !!
Abhi

On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Sorry that I forgot to tell you that you should also call `rdd.count()`
> for "reduceByKey" as well. Could you try it and see if it works?
>
> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> I am using mapWithState after doing reduceByKey.
>>
>> I am right now using mapWithState as you suggested and triggering the
>> count manually.
>>
>> But, still unable to see any checkpointing taking place. In the DAG I can
>> see that the reduceByKey operation for the previous batches are also being
>> computed.
>>
>>
>> Thanks
>> Abhi
>>
>>
>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>
>>> JavaMapWithStateDStream<...> stateDStream =
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>> stateDStream.foreachRDD(new Function1<...>() {
>>>   @Override
>>>   public Void call(JavaRDD<...> rdd) throws Exception {
>>> rdd.count();
>>>   }
>>> });
>>> return stateDStream.stateSnapshots();
>>>
>>>
>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> Reposting the code.
>>>>
>>>> Basically my use case is something like - I am receiving the web
>>>> impression logs and may get the notify (listening from kafka) for those
>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>>> need to swap the date field in impression with the date field in notify
>>>> logs. The notify for an impression has the same key as impression.
>>>>
>>>> static Function3<String, Optional, State,
>>>> Tuple2<String, MyClass>> mappingFunc =
>>>> new Function3<String, Optional, State, Tuple2<String,
>>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2<String, MyClass> call(String key, Optional one,
>>>> State state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3 /*notify for the impression received*/){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>  //swappping the dates
>>>> return new Tuple2<>(key, oldState);
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>>
>>>> }
>>>> };
>>>>
>>>>
>>>> return
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>>
>>>>
>>>> Currently I am using reducebykeyandwindow without the inverse function
>>>> and I am able to get the correct data. But, issue the might arise is when I
>>>> have to restart my application from checkpoint and it repartitions and
&g

Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-29 Thread Abhishek Anand
Hi Ryan,

I was able to resolve this issue. The /tmp location was mounted with
"noexec" option. Removing this noexec in the fstab resolved the issue. The
snappy shared object file is created at the /tmp location so either
removing the noexec from mount or changing the default temp location solved
ths issue.

export _JAVA_OPTIONS=-Djava.io.tmpdir=/mycustometemplocation



Thanks !!
Abhi


On Mon, Feb 29, 2016 at 3:46 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> This is because the Snappy library cannot load the native library. Did you
> forget to install the snappy native library in your new machines?
>
> On Fri, Feb 26, 2016 at 11:05 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Any insights on this ?
>>
>> On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand <abhis.anan...@gmail.com>
>> wrote:
>>
>>> On changing the default compression codec which is snappy to lzf the
>>> errors are gone !!
>>>
>>> How can I fix this using snappy as the codec ?
>>>
>>> Is there any downside of using lzf as snappy is the default codec that
>>> ships with spark.
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand <abhis.anan...@gmail.com
>>> > wrote:
>>>
>>>> Hi ,
>>>>
>>>> I am getting the following exception on running my spark streaming job.
>>>>
>>>> The same job has been running fine since long and when I added two new
>>>> machines to my cluster I see the job failing with the following exception.
>>>>
>>>>
>>>>
>>>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>>>> (TID 22594)
>>>> java.io.IOException: java.lang.reflect.InvocationTargetException
>>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>>>> at
>>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>>>> at
>>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>>> ... 11 more
>>>> Caused by: java.lang.IllegalArgumentException
>>>> at
>>>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>>>> ... 20 more
>>>>
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>
>>>
>>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-27 Thread Abhishek Anand
Hi Ryan,

I am using mapWithState after doing reduceByKey.

I am right now using mapWithState as you suggested and triggering the count
manually.

But, still unable to see any checkpointing taking place. In the DAG I can
see that the reduceByKey operation for the previous batches are also being
computed.


Thanks
Abhi


On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey Abhi,
>
> Using reducebykeyandwindow and mapWithState will trigger the bug
> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>
> JavaMapWithStateDStream<...> stateDStream =
> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
> stateDStream.foreachRDD(new Function1<...>() {
>   @Override
>   public Void call(JavaRDD<...> rdd) throws Exception {
> rdd.count();
>   }
> });
> return stateDStream.stateSnapshots();
>
>
> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> Reposting the code.
>>
>> Basically my use case is something like - I am receiving the web
>> impression logs and may get the notify (listening from kafka) for those
>> impressions in the same interval (for me its 1 min) or any next interval
>> (upto 2 hours). Now, when I receive notify for a particular impression I
>> need to swap the date field in impression with the date field in notify
>> logs. The notify for an impression has the same key as impression.
>>
>> static Function3<String, Optional, State,
>> Tuple2<String, MyClass>> mappingFunc =
>> new Function3<String, Optional, State, Tuple2<String,
>> MyClass>>() {
>> @Override
>> public Tuple2<String, MyClass> call(String key, Optional one,
>> State state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1 /*this is impression*/){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
>> /*notify for the impression received*/){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>  //swappping the dates
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> return
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>
>>
>> Currently I am using reducebykeyandwindow without the inverse function
>> and I am able to get the correct data. But, issue the might arise is when I
>> have to restart my application from checkpoint and it repartitions and
>> computes the previous 120 partitions, which delays the incoming batches.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Could you post how you use mapWithState? By default, it should do
>>> checkpointing every 10 batches.
>>> However, there is a known issue that prevents mapWithState from
>>> checkpointing in some special cases:
>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>
>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com
>>> > wrote:
>>>
>>>> Any Insights on this one ?
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> I am now trying to use mapWithState in the following way using some
>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>> the state and when restarting the application from checkpoint, it
>>>>> re-partitions all the previous batches data from kafka.
>>>>>
>>>>> static Function3<String, Optional, State,
>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>> new Function3<String, Optional, State,
>>>>> Tuple2<String, My

Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-26 Thread Abhishek Anand
Any insights on this ?

On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> On changing the default compression codec which is snappy to lzf the
> errors are gone !!
>
> How can I fix this using snappy as the codec ?
>
> Is there any downside of using lzf as snappy is the default codec that
> ships with spark.
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi ,
>>
>> I am getting the following exception on running my spark streaming job.
>>
>> The same job has been running fine since long and when I added two new
>> machines to my cluster I see the job failing with the following exception.
>>
>>
>>
>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>> (TID 22594)
>> java.io.IOException: java.lang.reflect.InvocationTargetException
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>> at org.apache.spark.broadcast.TorrentBroadcast.org
>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>> ... 11 more
>> Caused by: java.lang.IllegalArgumentException
>> at
>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>> ... 20 more
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>
>


Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-25 Thread Abhishek Anand
On changing the default compression codec which is snappy to lzf the errors
are gone !!

How can I fix this using snappy as the codec ?

Is there any downside of using lzf as snappy is the default codec that
ships with spark.


Thanks !!!
Abhi

On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi ,
>
> I am getting the following exception on running my spark streaming job.
>
> The same job has been running fine since long and when I added two new
> machines to my cluster I see the job failing with the following exception.
>
>
>
> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
> (TID 22594)
> java.io.IOException: java.lang.reflect.InvocationTargetException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
> at
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
> ... 11 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
> ... 20 more
>
>
>
> Thanks !!!
> Abhi
>


RE: LDA topic Modeling spark + python

2016-02-24 Thread Mishra, Abhishek
Hello All,

If someone has any leads on this please help me.

Sincerely,
Abhishek

From: Mishra, Abhishek
Sent: Wednesday, February 24, 2016 5:11 PM
To: user@spark.apache.org
Subject: LDA topic Modeling spark + python


Hello All,





I am doing a LDA model, please guide me with something.



I have a csv file which has two column "user_id" and "status". I have to 
generate a word-topic distribution after aggregating the user_id. Meaning to 
say I need to model it for users on their grouped status. The topic length 
being 2000 and value of k or number of words being 3.



Please, if you can provide me with some link or some code base on spark with 
python ; I would be grateful.





Looking forward for a  reply,



Sincerely,

Abhishek



LDA topic Modeling spark + python

2016-02-24 Thread Mishra, Abhishek
Hello All,





I am doing a LDA model, please guide me with something.



I have a csv file which has two column "user_id" and "status". I have to 
generate a word-topic distribution after aggregating the user_id. Meaning to 
say I need to model it for users on their grouped status. The topic length 
being 2000 and value of k or number of words being 3.



Please, if you can provide me with some link or some code base on spark with 
python ; I would be grateful.





Looking forward for a  reply,



Sincerely,

Abhishek



value from groubBy paired rdd

2016-02-23 Thread Mishra, Abhishek
Hello All,


I am new to spark and python, here is my doubt, please suggest...

I have a csv file which has 2 column "user_id" and "status".
I have read it into a rdd and then removed the header of the csv file. Then I 
split the record by "," (comma) and generate pair rdd. On that rdd I 
groupByKey. Now that I am trying to gather the value only from rdd and create a 
list I am getting exceptions. Here is my code. Please suggest how can I just 
get the values from the grouped rdd and store them, csv has 2 columns...I am 
trying to extract using x[1]. Code below: The code in pyspark:

data = sc.textFile('file:///home/cloudera/LDA-Model/Pyspark/test1.csv')
header = data.first() #extract header
data = data.filter(lambda x:x !=header)#filter out header
pairs = data.map(lambda x: (x.split(",")[0], x))#.collect()#generate pair rdd 
key value
grouped=pairs.groupByKey()#grouping values as per key
grouped_val= grouped.map(lambda x : (list(x[1]))).collect()
print grouped_val



Thanks in Advance,
Sincerely,
Abhishek



Query Kafka Partitions from Spark SQL

2016-02-23 Thread Abhishek Anand
Is there a way to query the json (or any other format) data stored in kafka
using spark sql by providing the offset range on each of the brokers ?

I just want to be able to query all the partitions in a sq manner.

Thanks !
Abhi


RE: Sample project on Image Processing

2016-02-22 Thread Mishra, Abhishek
Thank you Everyone.
I am to work on PoC with 2 types of images, that basically will be two PoC’s. 
Face recognition and Map data processing.

I am looking to these links and hopefully will get an idea. Thanks again. Will 
post the queries as and when I get doubts.

Sincerely,
Abhishek

From: ndj...@gmail.com [mailto:ndj...@gmail.com]
Sent: Monday, February 22, 2016 7:31 PM
To: Sainath Palla
Cc: Mishra, Abhishek; user@spark.apache.org
Subject: Re: Sample project on Image Processing

Hi folks,

KeystoneML has some image processing features: 
http://keystone-ml.org/examples.html

 Cheers,
Ardo

Sent from my iPhone

On 22 Feb 2016, at 14:34, Sainath Palla 
<pallasain...@gmail.com<mailto:pallasain...@gmail.com>> wrote:
Here is one simple example of Image classification in Java.

http://blogs.quovantis.com/image-classification-using-apache-spark-with-linear-svm/

Personally, I feel python provides better libraries for image processing. But 
it mostly depends on what kind of Image processing you are doing.

If you are stuck at the initial stages to load/save images, here is sample code 
to do the same. This is in PySpark.



from PIL import Image
import numpy as np

#Load Images in form of binary Files

images = sc.binaryFiles("Path")

#Convert Image to array. It converts the image into [x,y,3]  format
# x,y are image dimensions and 3 is for R,G,B format.

image_to_array = lambda rawdata: np.asarray(Image.open(StringIO(rawdata)))

#Saving the image to file after processing
#x has image name and img has image in array

for x,img in imageOutIMG.toLocalIterator():
path="Path"+x+".jpg"
img.save(path)






On Mon, Feb 22, 2016 at 3:23 AM, Mishra, Abhishek 
<abhishek.mis...@xerox.com<mailto:abhishek.mis...@xerox.com>> wrote:
Hello,
I am working on image processing samples. Was wondering if anyone has worked on 
Image processing project in spark. Please let me know if any sample project or 
example is available.

Please guide in this.
Sincerely,
Abhishek



Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Abhishek Anand
Hi Ryan,

Reposting the code.

Basically my use case is something like - I am receiving the web impression
logs and may get the notify (listening from kafka) for those impressions in
the same interval (for me its 1 min) or any next interval (upto 2 hours).
Now, when I receive notify for a particular impression I need to swap the
date field in impression with the date field in notify logs. The notify for
an impression has the same key as impression.

static Function3<String, Optional, State, Tuple2<String,
MyClass>> mappingFunc =
new Function3<String, Optional, State, Tuple2<String,
MyClass>>() {
@Override
public Tuple2<String, MyClass> call(String key, Optional one,
State state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1 /*this is impression*/){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
/*notify for the impression received*/){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
 //swappping the dates
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


return
myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();


Currently I am using reducebykeyandwindow without the inverse function and
I am able to get the correct data. But, issue the might arise is when I
have to restart my application from checkpoint and it repartitions and
computes the previous 120 partitions, which delays the incoming batches.


Thanks !!
Abhi

On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3<String, Optional, State,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional, State, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional one,
>>> State state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian@gmail.com>
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>&

java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-22 Thread Abhishek Anand
Hi ,

I am getting the following exception on running my spark streaming job.

The same job has been running fine since long and when I added two new
machines to my cluster I see the job failing with the following exception.



16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
(TID 22594)
java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
at org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
... 11 more
Caused by: java.lang.IllegalArgumentException
at
org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
... 20 more



Thanks !!!
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Abhishek Anand
Any Insights on this one ?


Thanks !!!
Abhi

On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> I am now trying to use mapWithState in the following way using some
> example codes. But, by looking at the DAG it does not seem to checkpoint
> the state and when restarting the application from checkpoint, it
> re-partitions all the previous batches data from kafka.
>
> static Function3<String, Optional, State, Tuple2<String,
> MyClass>> mappingFunc =
> new Function3<String, Optional, State, Tuple2<String,
> MyClass>>() {
> @Override
> public Tuple2<String, MyClass> call(String key, Optional one,
> State state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> Please suggest if this is the proper way or am I doing something wrong.
>
>
> Thanks !!
> Abhi
>
> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian@gmail.com>
> wrote:
>
>> If you don't want to update your only option will be updateStateByKey then
>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> mapWithState supports checkpoint.
>>>
>>> There has been some bug fix since release of 1.6.0
>>> e.g.
>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>> KryoSerializer
>>>
>>> which is in the upcoming 1.6.1
>>>
>>> Cheers
>>>
>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Does mapWithState checkpoints the data ?
>>>>
>>>> When my application goes down and is restarted from checkpoint, will
>>>> mapWithState need to recompute the previous batches data ?
>>>>
>>>> Also, to use mapWithState I will need to upgrade my application as I am
>>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>> other work around ?
>>>>
>>>> Cheers!!
>>>> Abhi
>>>>
>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian@gmail.com
>>>> > wrote:
>>>>
>>>>> Looks like mapWithState could help you?
>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have an use case like follows in my production environment where I
>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>> hours.
>>>>>>
>>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>>> but with different value,which might appear in the same batch or some 
>>>>>> next
>>>>>> batch.
>>>>>>
>>>>>> When the key appears second time I need to update a field in value of
>>>>>> previous key with a field in the later key. The keys for which the
>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>
>>>>>> At the end of each second I need to output the result to external
>>>>>> database.
>>>>>>
>>>>>> For example :
>>>>>>
>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>> At t=1sec I am getting
>>>>>> key0,value0(0,"prev0")
>>>>>> key1,value1 (1, "prev1")
>>>>>> key2,value2 (2,"prev2")
>>>>>> key2,value3 (3, "next2")
>>>>>>
>>>>>> Output to database after 1 sec
>>>>>> key2, newValue (2,"next2")
>>>>>>
>>>>>> At t=2 sec getting
>>>>>> key3,value4(4,"prev3")
>>>>>> key1,value5(5,"next1")
>>>>>>
>>>>>> Output to database after 2 sec
>>>>>> key1,newValue(1,"next1")
>>>>>>
>>>>>> At t=3 sec
>>>>>> key4,value6(6,"prev4")
>>>>>> key3,value7(7,"next3")
>>>>>> key5,value5(8,"prev5")
>>>>>> key5,value5(9,"next5")
>>>>>> key0,value0(10,"next0")
>>>>>>
>>>>>> Output to database after 3 sec
>>>>>> key0,newValue(0,"next0")
>>>>>> key3,newValue(4,"next3")
>>>>>> key5,newValue(8,"next5")
>>>>>>
>>>>>>
>>>>>> Please suggest how this can be achieved.
>>>>>>
>>>>>>
>>>>>> Thanks a lot 
>>>>>> Abhi
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>


Sample project on Image Processing

2016-02-22 Thread Mishra, Abhishek
Hello,
I am working on image processing samples. Was wondering if anyone has worked on 
Image processing project in spark. Please let me know if any sample project or 
example is available.

Please guide in this.
Sincerely,
Abhishek


Re: Worker's BlockManager Folder not getting cleared

2016-02-17 Thread Abhishek Anand
Looking for answer to this.

Is it safe to delete the older files using

find . -type f -cmin +200 -name "shuffle*" -exec rm -rf {} \;

For a window duration of 2 hours how older files can we delete ?

Thanks.

On Sun, Feb 14, 2016 at 12:34 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi All,
>
> Any ideas on this one ?
>
> The size of this directory keeps on growing.
>
> I can see there are many files from a day earlier too.
>
> Cheers !!
> Abhi
>
> On Tue, Jan 26, 2016 at 7:13 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Hi Adrian,
>>
>> I am running spark in standalone mode.
>>
>> The spark version that I am using is 1.4.0
>>
>> Thanks,
>> Abhi
>>
>> On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett <adr...@opensignal.com>
>> wrote:
>>
>>> Hi Abhi - are you running on Mesos perchance?
>>>
>>> If so then with spark <1.6 you will be hitting
>>> https://issues.apache.org/jira/browse/SPARK-10975
>>> With spark >= 1.6:
>>> https://issues.apache.org/jira/browse/SPARK-12430
>>> and also be aware of:
>>> https://issues.apache.org/jira/browse/SPARK-12583
>>>
>>>
>>> On 25/01/2016 07:14, Abhishek Anand wrote:
>>>
>>> Hi All,
>>>
>>> How long the shuffle files and data files are stored on the block
>>> manager folder of the workers.
>>>
>>> I have a spark streaming job with window duration of 2 hours and slide
>>> interval of 15 minutes.
>>>
>>> When I execute the following command in my block manager path
>>>
>>> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>>>
>>> I see a lot of files which means that they are not getting cleared which
>>> I was expecting that they should get cleared.
>>>
>>> Subsequently, this size keeps on increasing and takes space on the disk.
>>>
>>> Please suggest how to get rid of this and help on understanding this
>>> behaviour.
>>>
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>>
>>> --
>>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>>> <http://www.opensignal.com>
>>> _
>>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>>> Phone #: +44 777-377-8251
>>> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>
>>>   |  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
>>> _
>>>
>>
>>
>


Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-16 Thread Abhishek Anand
Hi Cody,

I am able to do using this piece of code

kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> {
Date currentBatchTime = new Date();
currentBatchTime.setTime(batchMilliSec.milliseconds());
List r = new ArrayList();
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
for(int partition = 0; partition < offsetRanges.length; partition++){
//Add offsets to the list
}
JavaSparkContext ctx = new JavaSparkContext(rdd.context());
JavaRDD currrentBatchOffsets = ctx.parallelize(r);
//write currrentBatchOffsets rdd to cassandra
return null;
});


Is this the correct way of doing this ?


Thanks !!
Abhi

On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You could use sc.parallelize... but the offsets are already available at
> the driver, and they're a (hopefully) small enough amount of data that's
> it's probably more straightforward to just use the normal cassandra client
> to save them from the driver.
>
> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> I have a kafka rdd and I need to save the offsets to cassandra table at
>> the begining of each batch.
>>
>> Basically I need to write the offsets of the type Offsets below that I am
>> getting inside foreachRD, to cassandra. The javafunctions api to write to
>> cassandra needs a rdd. How can I create a rdd from offsets and write to
>> cassandra table.
>>
>>
>> public static void writeOffsets(JavaPairDStream<String,
>> String> kafkastream){
>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>> return null;
>> });
>>
>>
>> Thanks !!
>> Abhi
>>
>>
>>
>


Re: Re: Unusually large deserialisation time

2016-02-16 Thread Abhishek Modi
PS - I don't get this behaviour in all the cases. I did many runs of the
same job & i get this behaviour in around 40% of the cases.

Task 4 is the bottom row in the metrics table

Thank you,
Abhishek

e: abshkm...@gmail.com
p: 91-8233540996


On Tue, Feb 16, 2016 at 11:19 PM, Abhishek Modi <abshkm...@gmail.com> wrote:

> Darren: this is not the last task of the stage.
>
> Thank you,
> Abhishek
>
> e: abshkm...@gmail.com
> p: 91-8233540996
>
>
> On Tue, Feb 16, 2016 at 6:52 PM, Darren Govoni <dar...@ontrenet.com>
> wrote:
>
>> There were some posts in this group about it. Another person also saw the
>> deadlock on next to last or last stage task.
>>
>> I've attached some images I collected showing this problem.
>>
>>
>>
>> --- Original Message ---
>> On 2/16/2016  07:29 AM Ted Yu wrote:Darren:
>> Can you post link to the deadlock issue you mentioned ?
>> 
>> Thanks
>> 
>> > On Feb 16, 2016, at 6:55 AM, Darren Govoni <dar...@ontrenet.com>
>> wrote:
>> > > I think this is part of the bigger issue of serious deadlock
>> conditions occurring in spark many of us have posted on.
>> > > Would the task in question be the past task of a stage by
>> chance?
>> > > > > Sent from my Verizon Wireless 4G LTE smartphone
>> > > >  Original message 
>> > From: Abhishek Modi <abshkm...@gmail.com> > Date: 02/16/2016
>> 4:12 AM (GMT-05:00) > To: user@spark.apache.org > Subject:
>> Unusually large deserialisation time > > I'm doing a mapPartitions
>> on a rdd cached in memory followed by a reduce. Here is my code snippet
>> > > // myRdd is an rdd consisting of Tuple2[Int,Long] >
>> myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2))
>> > > //The rangify function > def rangify(l: Iterator[
>> Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ ArrayBuffer[
>> Tuple2[Long,Long] ] ] ] ]= { >   var sum=0L >   val
>> mylist=ArrayBuffer[ Tuple2[Long,Long] ]() > >   if(l.isEmpty)
>> > return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ]
>> ())).toIterator > >   var prev= -1000L >   var begin= -1000L
>> > >   for (x <- l){ > sum+=x._1 > > if(prev<0){
>> >   prev=x._2 >   begin=x._2 > } > >
>>  else if(x._2==prev+1) >   prev=x._2 > > else { >
>>  list+=((begin,prev)) >   prev=x._2 >   begin=x._2
>> > } >   } > >   mylist+= ((begin,prev)) > >
>>  List((sum, List(mylist) ) ).toIterator > } > > > The rdd
>> is cached in memory. I'm using 20 executors with 1 core for each executor.
>> The cached rdd has 60 blocks. The problem is for every 2-3 runs of the job,
>> there is a task which has an abnormally large deserialisation time.
>> Screenshot attached > > Thank you,
>> > Abhishek
>> > 
>
>
>


Re: Re: Unusually large deserialisation time

2016-02-16 Thread Abhishek Modi
Darren: this is not the last task of the stage.

Thank you,
Abhishek

e: abshkm...@gmail.com
p: 91-8233540996


On Tue, Feb 16, 2016 at 6:52 PM, Darren Govoni <dar...@ontrenet.com> wrote:

> There were some posts in this group about it. Another person also saw the
> deadlock on next to last or last stage task.
>
> I've attached some images I collected showing this problem.
>
>
>
> --- Original Message ---
> On 2/16/2016  07:29 AM Ted Yu wrote:Darren:
> Can you post link to the deadlock issue you mentioned ?
> 
> Thanks
> 
> > On Feb 16, 2016, at 6:55 AM, Darren Govoni <dar...@ontrenet.com>
> wrote:
> > > I think this is part of the bigger issue of serious deadlock
> conditions occurring in spark many of us have posted on.
> > > Would the task in question be the past task of a stage by
> chance?
> > > > > Sent from my Verizon Wireless 4G LTE smartphone
> > > >  Original message 
> > From: Abhishek Modi <abshkm...@gmail.com> > Date: 02/16/2016
> 4:12 AM (GMT-05:00) > To: user@spark.apache.org > Subject:
> Unusually large deserialisation time > > I'm doing a mapPartitions
> on a rdd cached in memory followed by a reduce. Here is my code snippet
> > > // myRdd is an rdd consisting of Tuple2[Int,Long] >
> myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2))
> > > //The rangify function > def rangify(l: Iterator[
> Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ ArrayBuffer[
> Tuple2[Long,Long] ] ] ] ]= { >   var sum=0L >   val
> mylist=ArrayBuffer[ Tuple2[Long,Long] ]() > >   if(l.isEmpty)
> > return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ]
> ())).toIterator > >   var prev= -1000L >   var begin= -1000L
> > >   for (x <- l){ > sum+=x._1 > > if(prev<0){
> >   prev=x._2 >   begin=x._2 > } > >
>  else if(x._2==prev+1) >   prev=x._2 > > else { >
>  list+=((begin,prev)) >   prev=x._2 >   begin=x._2
> > } >   } > >   mylist+= ((begin,prev)) > >
>  List((sum, List(mylist) ) ).toIterator > } > > > The rdd
> is cached in memory. I'm using 20 executors with 1 core for each executor.
> The cached rdd has 60 blocks. The problem is for every 2-3 runs of the job,
> there is a task which has an abnormally large deserialisation time.
> Screenshot attached > > Thank you,
> > Abhishek
> > 


Unusually large deserialisation time

2016-02-16 Thread Abhishek Modi
I'm doing a mapPartitions on a rdd cached in memory followed by a reduce.
Here is my code snippet

// myRdd is an rdd consisting of Tuple2[Int,Long]
myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2))

//The rangify function
def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List
[ ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= {
  var sum=0L
  val mylist=ArrayBuffer[ Tuple2[Long,Long] ]()

  if(l.isEmpty)
return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ]
())).toIterator

  var prev= -1000L
  var begin= -1000L

  for (x <- l){
sum+=x._1

if(prev<0){
  prev=x._2
  begin=x._2
}

else if(x._2==prev+1)
  prev=x._2

else {
  list+=((begin,prev))
  prev=x._2
  begin=x._2
}
  }

  mylist+= ((begin,prev))

  List((sum, List(mylist) ) ).toIterator
}


The rdd is cached in memory. I'm using 20 executors with 1 core for each
executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs
of the job, there is a task which has an abnormally large deserialisation
time. Screenshot attached

Thank you,
Abhishek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Abnormally large deserialisation time for some tasks

2016-02-16 Thread Abhishek Modi
I'm doing a mapPartitions on a rdd cached in memory followed by a reduce.
Here is my code snippet

// myRdd is an rdd consisting of Tuple2[Int,Long]
myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2))

//The rangify function
def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [
ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= {
  var sum=0L
  val mylist=ArrayBuffer[ Tuple2[Long,Long] ]()

  if(l.isEmpty)
return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ]
())).toIterator

  var prev= -1000L
  var begin= -1000L

  for (x <- l){
sum+=x._1

if(prev<0){
  prev=x._2
  begin=x._2
}

else if(x._2==prev+1)
  prev=x._2

else {
  list+=((begin,prev))
  prev=x._2
  begin=x._2
}
  }

  mylist+= ((begin,prev))

  List((sum, List(mylist) ) ).toIterator
}


The rdd is cached in memory. I'm using 20 executors with 1 core for each
executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs of
the job, there is a task which has an abnormally large deserialisation time.
Screenshot attached

 


 

What could be the possible reason for this ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Abnormally-large-deserialisation-time-for-some-tasks-tp26233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-15 Thread Abhishek Anand
I have a kafka rdd and I need to save the offsets to cassandra table at the
begining of each batch.

Basically I need to write the offsets of the type Offsets below that I am
getting inside foreachRD, to cassandra. The javafunctions api to write to
cassandra needs a rdd. How can I create a rdd from offsets and write to
cassandra table.


public static void writeOffsets(JavaPairDStream kafkastream){
kafkastream.foreachRDD((rdd,batchMilliSec) -> {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
return null;
});


Thanks !!
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-15 Thread Abhishek Anand
I am now trying to use mapWithState in the following way using some example
codes. But, by looking at the DAG it does not seem to checkpoint the state
and when restarting the application from checkpoint, it re-partitions all
the previous batches data from kafka.

static Function3<String, Optional, State, Tuple2<String,
MyClass>> mappingFunc =
new Function3<String, Optional, State, Tuple2<String,
MyClass>>() {
@Override
public Tuple2<String, MyClass> call(String key, Optional one,
State state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


Please suggest if this is the proper way or am I doing something wrong.


Thanks !!
Abhi

On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian@gmail.com>
wrote:

> If you don't want to update your only option will be updateStateByKey then
> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> mapWithState supports checkpoint.
>>
>> There has been some bug fix since release of 1.6.0
>> e.g.
>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>> KryoSerializer
>>
>> which is in the upcoming 1.6.1
>>
>> Cheers
>>
>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> Does mapWithState checkpoints the data ?
>>>
>>> When my application goes down and is restarted from checkpoint, will
>>> mapWithState need to recompute the previous batches data ?
>>>
>>> Also, to use mapWithState I will need to upgrade my application as I am
>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>> other work around ?
>>>
>>> Cheers!!
>>> Abhi
>>>
>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian@gmail.com>
>>> wrote:
>>>
>>>> Looks like mapWithState could help you?
>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have an use case like follows in my production environment where I
>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>> hours.
>>>>>
>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>> but with different value,which might appear in the same batch or some next
>>>>> batch.
>>>>>
>>>>> When the key appears second time I need to update a field in value of
>>>>> previous key with a field in the later key. The keys for which the
>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>
>>>>> At the end of each second I need to output the result to external
>>>>> database.
>>>>>
>>>>> For example :
>>>>>
>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>> At t=1sec I am getting
>>>>> key0,value0(0,"prev0")
>>>>> key1,value1 (1, "prev1")
>>>>> key2,value2 (2,"prev2")
>>>>> key2,value3 (3, "next2")
>>>>>
>>>>> Output to database after 1 sec
>>>>> key2, newValue (2,"next2")
>>>>>
>>>>> At t=2 sec getting
>>>>> key3,value4(4,"prev3")
>>>>> key1,value5(5,"next1")
>>>>>
>>>>> Output to database after 2 sec
>>>>> key1,newValue(1,"next1")
>>>>>
>>>>> At t=3 sec
>>>>> key4,value6(6,"prev4")
>>>>> key3,value7(7,"next3")
>>>>> key5,value5(8,"prev5")
>>>>> key5,value5(9,"next5")
>>>>> key0,value0(10,"next0")
>>>>>
>>>>> Output to database after 3 sec
>>>>> key0,newValue(0,"next0")
>>>>> key3,newValue(4,"next3")
>>>>> key5,newValue(8,"next5")
>>>>>
>>>>>
>>>>> Please suggest how this can be achieved.
>>>>>
>>>>>
>>>>> Thanks a lot 
>>>>> Abhi
>>>>>
>>>>>
>>>>>
>>>
>>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-13 Thread Abhishek Anand
Does mapWithState checkpoints the data ?

When my application goes down and is restarted from checkpoint, will
mapWithState need to recompute the previous batches data ?

Also, to use mapWithState I will need to upgrade my application as I am
using version 1.4.0 and mapWithState isnt supported there. Is there any
other work around ?

Cheers!!
Abhi

On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian@gmail.com>
wrote:

> Looks like mapWithState could help you?
> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have an use case like follows in my production environment where I am
>> listening from kafka with slideInterval of 1 min and windowLength of 2
>> hours.
>>
>> I have a JavaPairDStream where for each key I am getting the same key but
>> with different value,which might appear in the same batch or some next
>> batch.
>>
>> When the key appears second time I need to update a field in value of
>> previous key with a field in the later key. The keys for which the
>> combination keys do not come should be rejected after 2 hours.
>>
>> At the end of each second I need to output the result to external
>> database.
>>
>> For example :
>>
>> Suppose valueX is object of MyClass with fields int a, String b
>> At t=1sec I am getting
>> key0,value0(0,"prev0")
>> key1,value1 (1, "prev1")
>> key2,value2 (2,"prev2")
>> key2,value3 (3, "next2")
>>
>> Output to database after 1 sec
>> key2, newValue (2,"next2")
>>
>> At t=2 sec getting
>> key3,value4(4,"prev3")
>> key1,value5(5,"next1")
>>
>> Output to database after 2 sec
>> key1,newValue(1,"next1")
>>
>> At t=3 sec
>> key4,value6(6,"prev4")
>> key3,value7(7,"next3")
>> key5,value5(8,"prev5")
>> key5,value5(9,"next5")
>> key0,value0(10,"next0")
>>
>> Output to database after 3 sec
>> key0,newValue(0,"next0")
>> key3,newValue(4,"next3")
>> key5,newValue(8,"next5")
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>> Thanks a lot 
>> Abhi
>>
>>
>>


Re: Worker's BlockManager Folder not getting cleared

2016-02-13 Thread Abhishek Anand
Hi All,

Any ideas on this one ?

The size of this directory keeps on growing.

I can see there are many files from a day earlier too.

Cheers !!
Abhi

On Tue, Jan 26, 2016 at 7:13 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi Adrian,
>
> I am running spark in standalone mode.
>
> The spark version that I am using is 1.4.0
>
> Thanks,
> Abhi
>
> On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett <adr...@opensignal.com>
> wrote:
>
>> Hi Abhi - are you running on Mesos perchance?
>>
>> If so then with spark <1.6 you will be hitting
>> https://issues.apache.org/jira/browse/SPARK-10975
>> With spark >= 1.6:
>> https://issues.apache.org/jira/browse/SPARK-12430
>> and also be aware of:
>> https://issues.apache.org/jira/browse/SPARK-12583
>>
>>
>> On 25/01/2016 07:14, Abhishek Anand wrote:
>>
>> Hi All,
>>
>> How long the shuffle files and data files are stored on the block manager
>> folder of the workers.
>>
>> I have a spark streaming job with window duration of 2 hours and slide
>> interval of 15 minutes.
>>
>> When I execute the following command in my block manager path
>>
>> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>>
>> I see a lot of files which means that they are not getting cleared which
>> I was expecting that they should get cleared.
>>
>> Subsequently, this size keeps on increasing and takes space on the disk.
>>
>> Please suggest how to get rid of this and help on understanding this
>> behaviour.
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>>
>> --
>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>> <http://www.opensignal.com>
>> _
>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>> Phone #: +44 777-377-8251
>> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>
>>   |  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
>> _
>>
>
>


Stateful Operation on JavaPairDStream Help Needed !!

2016-02-11 Thread Abhishek Anand
Hi All,

I have an use case like follows in my production environment where I am
listening from kafka with slideInterval of 1 min and windowLength of 2
hours.

I have a JavaPairDStream where for each key I am getting the same key but
with different value,which might appear in the same batch or some next
batch.

When the key appears second time I need to update a field in value of
previous key with a field in the later key. The keys for which the
combination keys do not come should be rejected after 2 hours.

At the end of each second I need to output the result to external database.

For example :

Suppose valueX is object of MyClass with fields int a, String b
At t=1sec I am getting
key0,value0(0,"prev0")
key1,value1 (1, "prev1")
key2,value2 (2,"prev2")
key2,value3 (3, "next2")

Output to database after 1 sec
key2, newValue (2,"next2")

At t=2 sec getting
key3,value4(4,"prev3")
key1,value5(5,"next1")

Output to database after 2 sec
key1,newValue(1,"next1")

At t=3 sec
key4,value6(6,"prev4")
key3,value7(7,"next3")
key5,value5(8,"prev5")
key5,value5(9,"next5")
key0,value0(10,"next0")

Output to database after 3 sec
key0,newValue(0,"next0")
key3,newValue(4,"next3")
key5,newValue(8,"next5")


Please suggest how this can be achieved.


Thanks a lot 
Abhi


Re: Repartition taking place for all previous windows even after checkpointing

2016-02-01 Thread Abhishek Anand
Any insights on this ?


On Fri, Jan 29, 2016 at 1:08 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi All,
>
> Can someone help me with the following doubts regarding checkpointing :
>
> My code flow is something like follows ->
>
> 1) create direct stream from kafka
> 2) repartition kafka stream
> 3)  mapToPair followed by reduceByKey
> 4)  filter
> 5)  reduceByKeyAndWindow without the inverse function
> 6)  write to cassandra
>
> Now when I restart my application from checkpoint, I see repartition and
> other steps being called for the previous windows which takes longer and
> delays my aggregations.
>
> My understanding  was that once data checkpointing is done it should not
> re-read from kafka and use the saved RDDs but guess I am wrong.
>
> Is there a way to avoid the repartition or any workaround for this.
>
> Spark Version is 1.4.0
>
> Cheers !!
> Abhi
>


Repartition taking place for all previous windows even after checkpointing

2016-01-28 Thread Abhishek Anand
Hi All,

Can someone help me with the following doubts regarding checkpointing :

My code flow is something like follows ->

1) create direct stream from kafka
2) repartition kafka stream
3)  mapToPair followed by reduceByKey
4)  filter
5)  reduceByKeyAndWindow without the inverse function
6)  write to cassandra

Now when I restart my application from checkpoint, I see repartition and
other steps being called for the previous windows which takes longer and
delays my aggregations.

My understanding  was that once data checkpointing is done it should not
re-read from kafka and use the saved RDDs but guess I am wrong.

Is there a way to avoid the repartition or any workaround for this.

Spark Version is 1.4.0

Cheers !!
Abhi


Re: Worker's BlockManager Folder not getting cleared

2016-01-26 Thread Abhishek Anand
Hi Adrian,

I am running spark in standalone mode.

The spark version that I am using is 1.4.0

Thanks,
Abhi

On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett <adr...@opensignal.com>
wrote:

> Hi Abhi - are you running on Mesos perchance?
>
> If so then with spark <1.6 you will be hitting
> https://issues.apache.org/jira/browse/SPARK-10975
> With spark >= 1.6:
> https://issues.apache.org/jira/browse/SPARK-12430
> and also be aware of:
> https://issues.apache.org/jira/browse/SPARK-12583
>
>
> On 25/01/2016 07:14, Abhishek Anand wrote:
>
> Hi All,
>
> How long the shuffle files and data files are stored on the block manager
> folder of the workers.
>
> I have a spark streaming job with window duration of 2 hours and slide
> interval of 15 minutes.
>
> When I execute the following command in my block manager path
>
> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>
> I see a lot of files which means that they are not getting cleared which I
> was expecting that they should get cleared.
>
> Subsequently, this size keeps on increasing and takes space on the disk.
>
> Please suggest how to get rid of this and help on understanding this
> behaviour.
>
>
>
> Thanks !!!
> Abhi
>
>
> --
> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
> <http://www.opensignal.com>
> _
> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
> Phone #: +44 777-377-8251
> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>  |
>  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
> _
>


Worker's BlockManager Folder not getting cleared

2016-01-24 Thread Abhishek Anand
Hi All,

How long the shuffle files and data files are stored on the block manager
folder of the workers.

I have a spark streaming job with window duration of 2 hours and slide
interval of 15 minutes.

When I execute the following command in my block manager path

find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;

I see a lot of files which means that they are not getting cleared which I
was expecting that they should get cleared.

Subsequently, this size keeps on increasing and takes space on the disk.

Please suggest how to get rid of this and help on understanding this
behaviour.



Thanks !!!
Abhi


Getting kafka offsets at beginning of spark streaming application

2016-01-11 Thread Abhishek Anand
Hi,

Is there a way so that I can fetch the offsets from where the spark
streaming starts reading from Kafka when my application starts ?

What I am trying is to create an initial RDD with offsest at a particular
time passed as input from the command line and the offsets from where my
spark streaming starts.

Eg -

Partition 0 -> 1000 to (offset at which my spark streaming starts)

Thanks !!


Re: [Spark-SQL] Custom aggregate function for GrouppedData

2016-01-07 Thread Abhishek Gayakwad
Thanks Michael for replying, Aggregator/UDAF is exactly what I am looking
for, but are still on 1.4 and it's gonna take time to get 1.6.

On Wed, Jan 6, 2016 at 10:32 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> In Spark 1.6 GroupedDataset
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
>  has
> mapGroups, which sounds like what you are looking for.  You can also write
> a custom Aggregator
> <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html>
>
> On Tue, Jan 5, 2016 at 8:14 PM, Abhishek Gayakwad <a.gayak...@gmail.com>
> wrote:
>
>> Hello Hivemind,
>>
>> Referring to this thread -
>> https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
>> I have learnt that we can not do much with groupped data apart from using
>> existing aggregate functions. This blog post was written in may 2015, I
>> don't know if things are changes from that point of time. I am using 1.4
>> version of spark.
>>
>> What I am trying to achieve is something very similar to collectset in
>> hive (actually unique ordered concated values.) e.g.
>>
>> 1,2
>> 1,3
>> 2,4
>> 2,5
>> 2,4
>>
>> to
>> 1, "2,3"
>> 2, "4,5"
>>
>> Currently I am achieving this by converting dataframe to RDD, do the
>> required operations and convert it back to dataframe as shown below.
>>
>> public class AvailableSizes implements Serializable {
>>
>> public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
>> final JavaRDD rowJavaRDD = salesDataFrame.toJavaRDD();
>>
>> JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
>> (PairFunction<Row, String, Row>) row -> {
>> final Object[] objects = {row.getAs(0), row.getAs(1), 
>> row.getAs(3)};
>> return new 
>> Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new 
>> GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
>> });
>>
>> JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new 
>> Function2<Row, Row, Row>() {
>> @Override
>> public Row call(Row aRow, Row bRow) {
>> final String uniqueCommaSeparatedSizes = uniqueSizes(aRow, 
>> bRow);
>> final Object[] objects = {aRow.getAs(0), aRow.getAs(1), 
>> uniqueCommaSeparatedSizes};
>> return new GenericRowWithSchema(objects, 
>> SalesColumns.getOutputSchema());
>> }
>>
>> private String uniqueSizes(Row aRow, Row bRow) {
>> final SortedSet allSizes = new TreeSet<>();
>> final List aSizes = Arrays.asList(((String) 
>> aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>> final List bSizes = Arrays.asList(((String) 
>> bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
>> allSizes.addAll(aSizes);
>> allSizes.addAll(bSizes);
>> return csvFormat(allSizes);
>> }
>> });
>>
>> final JavaRDD values = withSizeList.values();
>>
>> return ssc.createDataFrame(values, SalesColumns.getOutputSchema());
>>
>> }
>>
>> public String csvFormat(Collection collection) {
>> return 
>> collection.stream().map(Object::toString).collect(Collectors.joining(","));
>> }
>> }
>>
>> Please suggest if there is a better way of doing this.
>>
>> Regards,
>> Abhishek
>>
>
>


[Spark-SQL] Custom aggregate function for GrouppedData

2016-01-05 Thread Abhishek Gayakwad
Hello Hivemind,

Referring to this thread -
https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html.
I have learnt that we can not do much with groupped data apart from using
existing aggregate functions. This blog post was written in may 2015, I
don't know if things are changes from that point of time. I am using 1.4
version of spark.

What I am trying to achieve is something very similar to collectset in hive
(actually unique ordered concated values.) e.g.

1,2
1,3
2,4
2,5
2,4

to
1, "2,3"
2, "4,5"

Currently I am achieving this by converting dataframe to RDD, do the
required operations and convert it back to dataframe as shown below.

public class AvailableSizes implements Serializable {

public DataFrame calculate(SQLContext ssc, DataFrame salesDataFrame) {
final JavaRDD rowJavaRDD = salesDataFrame.toJavaRDD();

JavaPairRDD<String, Row> pairs = rowJavaRDD.mapToPair(
(PairFunction<Row, String, Row>) row -> {
final Object[] objects = {row.getAs(0),
row.getAs(1), row.getAs(3)};
return new
Tuple2<>(row.getAs(SalesColumns.STYLE.name()), new
GenericRowWithSchema(objects, SalesColumns.getOutputSchema()));
});

JavaPairRDD<String, Row> withSizeList = pairs.reduceByKey(new
Function2<Row, Row, Row>() {
@Override
public Row call(Row aRow, Row bRow) {
final String uniqueCommaSeparatedSizes =
uniqueSizes(aRow, bRow);
final Object[] objects = {aRow.getAs(0),
aRow.getAs(1), uniqueCommaSeparatedSizes};
return new GenericRowWithSchema(objects,
SalesColumns.getOutputSchema());
}

private String uniqueSizes(Row aRow, Row bRow) {
final SortedSet allSizes = new TreeSet<>();
final List aSizes = Arrays.asList(((String)
aRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
final List bSizes = Arrays.asList(((String)
bRow.getAs(String.valueOf(SalesColumns.SIZE))).split(","));
allSizes.addAll(aSizes);
allSizes.addAll(bSizes);
return csvFormat(allSizes);
}
});

final JavaRDD values = withSizeList.values();

return ssc.createDataFrame(values, SalesColumns.getOutputSchema());

}

public String csvFormat(Collection collection) {
return 
collection.stream().map(Object::toString).collect(Collectors.joining(","));
}
}

Please suggest if there is a better way of doing this.

Regards,
Abhishek


Error on using updateStateByKey

2015-12-18 Thread Abhishek Anand
I am trying to use updateStateByKey but receiving the following error.
(Spark Version 1.4.0)

Can someone please point out what might be the possible reason for this
error.


*The method
updateStateByKey(Function2)
in the type JavaPairDStream is not applicable
for the arguments *
* 
(Function2)*


This is the update function that I am using inside updateStateByKey.

I am applying updateStateByKey on a tuple of 

private static Function2 updateFunction =
new Function2() {
/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Optional call(List values,
Optional current) {
AggregationMetrics newSum = current.or(new AggregationMetrics(0L, 0L, 0L));
for(int i=0; i < values.size(); i++)
{
//set with new values
}
return Optional.of(newSum);
}
};



Thanks,
Abhi


Re: PairRDD(K, L) to multiple files by key serializing each value in L before

2015-12-16 Thread Abhishek Shivkumar
Hello Daniel,

  I was thinking if you can write

catGroupArr.map(lambda line: create_and_write_file(line))

def create_and_write_file(line):

1. look at the key of line: line[0]
2. Open a file with required file name based on key
3. iterate through the values of this key,value pair

   for ele in line[1]:

4. Write every ele into the file created.
5. Close the file.

Do you think this works?

Thanks
Abhishek S


Thank you!

With Regards,
Abhishek S

On Wed, Dec 16, 2015 at 1:05 AM, Daniel Valdivia <h...@danielvaldivia.com>
wrote:

> Hello everyone,
>
> I have a PairRDD with a set of key and list of values, each value in the
> list is a json which I already loaded beginning of my spark app, how can I
> iterate over each value of the list in my pair RDD to transform it to a
> string then save the whole content of the key to a file? one file per key
>
> my input files look like cat-0-500.txt:
>
> *{cat:'red',value:'asd'}*
> *{cat:'green',value:'zxc'}*
> *{cat:'red',value:'jkl'}*
>
> The PairRDD looks like
>
> *('red', [{cat:'red',value:'asd'},{cat:'red',value:'jkl'}])*
> *('green', [{cat:'green',value:'zxc'}])*
>
> so as you can see I I'd like to serialize each json in the value list back
> to string so I can easily saveAsTextFile(), ofcourse I'm trying to save a
> separate file for each key
>
> The way I got here:
>
> *rawcatRdd = sc.textFile("hdfs://x.x.x.../unstructured/cat-0-500.txt")*
> *import json*
> *categoriesJson = rawcatRdd.map(lambda x: json.loads(x))*
> *categories = categoriesJson*
>
> *catByDate = categories.map(lambda x: (x['cat'], x)*
> *catGroup = catByDate.groupByKey()*
> *catGroupArr = catGroup.mapValues(lambda x : list(x))*
>
> Ideally I want to create a cat-red.txt that looks like:
>
> {cat:'red',value:'asd'}
> {cat:'red',value:'jkl'}
>
> and the same for the rest of the keys.
>
> I already looked at this answer
> <http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job>
>  but
> I'm slightly lost as host to process each value in the list (turn into
> string) before I save the contents to a file, also I cannot figure out how
> to import *MultipleTextOutputFormat* in python either.
>
> I'm trying all this wacky stuff in the pyspark shell
>
> Any advice would be greatly appreciated
>
> Thanks in advance!
>


How to unpack the values of an item in a RDD so I can create a RDD with multiple items?

2015-12-13 Thread Abhishek Shivkumar
Hi,

I have a RDD of many items.

Each item has a key and its value is a list of elements.

I want to unpack the elements of the item so that I can create a new RDD
with each of its item being the original key and one single element.

I tried doing RDD.flatmap(lambda line: [ (line[0], v) for v in line[1]])

but it throws an error saying "AttributeError: 'PipelinedRDD' object has no
attribute 'flatmap"

Can someone tell me the right way to unpack the values to different items
in the new RDD?

Thank you!

With Regards,
Abhishek S


Is it possible to pass additional parameters to a python function when used inside RDD.filter method?

2015-12-04 Thread Abhishek Shivkumar

Hi,

 I am using spark with python and I have a filter constraint as follows:

|my_rdd.filter(my_func)|

where my_func is a method I wrote to filter the rdd items based on my 
own logic. I have defined the my_func as follows:


|def  my_func(my_item):

{
...
}|

Now, I want to pass another separate parameter to my_func, besides the 
item that goes into it. How can I do that? I know my_item will refer to 
one item that comes from my_rdd and how can I pass my own parameter 
(let's say my_param) as an additional parameter to my_func?


Thanks
Abhishek S


--


*NOTICE AND DISCLAIMER*

This email (including attachments) is confidential. If you are not the 
intended recipient, notify the sender immediately, delete this email from 
your system and do not disclose or use for any purpose.


Business Address: Eagle House, 163 City Road, London, EC1V 1NR. United 
Kingdom
Registered Office: Finsgate, 5-7 Cranwood Street, London, EC1V 9EE. United 
Kingdom
Big Data Partnership Limited is a company registered in England & Wales 
with Company No 7904824


  1   2   >