Re: Append In-Place to S3

2018-06-07 Thread Benjamin Kim
I tried a different tactic. I still append based on the query below, but I add 
another deduping step afterwards, writing to a staging directory then 
overwriting back. Luckily, the data is small enough for this to happen fast.

Cheers,
Ben

> On Jun 3, 2018, at 3:02 PM, Tayler Lawrence Jones  
> wrote:
> 
> Sorry actually my last message is not true for anti join, I was thinking of 
> semi join. 
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones  <mailto:t.jonesd...@gmail.com>> wrote:
> A left join with null filter is only the same as a left anti join if the join 
> keys can be guaranteed unique in the existing data. Since hive tables on s3 
> offer no unique guarantees outside of your processing code, I recommend using 
> left anti join over left join + null filter.
> 
> -TJ
> 
> On Sun, Jun 3, 2018 at 14:47 ayan guha  <mailto:guha.a...@gmail.com>> wrote:
> I do not use anti join semantics, but you can use left outer join and then 
> filter out nulls from right side. Your data may have dups on the columns 
> separately but it should not have dups on the composite key ie all columns 
> put together.
> 
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones  <mailto:t.jonesd...@gmail.com>> wrote:
> The issue is not the append vs overwrite - perhaps those responders do not 
> know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 
> eventual consistency issues. 
> 
> First, your sql query is wrong as you don’t close the parenthesis of the CTE 
> (“with” part). In fact, it looks like you don’t need that with at all, and 
> the query should fail to parse. If that does parse, I would open a bug on the 
> spark jira.
> 
> Can you provide the query that you are using to detect duplication so I can 
> see if your deduplication logic matches the detection query? 
> 
> -TJ
> 
> On Sat, Jun 2, 2018 at 10:22 Aakash Basu  <mailto:aakash.spark@gmail.com>> wrote:
> As Jay suggested correctly, if you're joining then overwrite otherwise only 
> append as it removes dups.
> 
> I think, in this scenario, just change it to write.mode('overwrite') because 
> you're already reading the old data and your job would be done.
> 
> 
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  <mailto:bbuil...@gmail.com>> wrote:
> Hi Jay,
> 
> Thanks for your response. Are you saying to append the new data and then 
> remove the duplicates to the whole data set afterwards overwriting the 
> existing data set with new data set with appended values? I will give that a 
> try. 
> 
> Cheers,
> Ben
> 
> On Fri, Jun 1, 2018 at 11:49 PM Jay  <mailto:jayadeep.jayara...@gmail.com>> wrote:
> Benjamin,
> 
> The append will append the "new" data to the existing data with removing the 
> duplicates. You would need to overwrite the file everytime if you need unique 
> values.
> 
> Thanks,
> Jayadeep
> 
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  <mailto:bbuil...@gmail.com>> wrote:
> I have a situation where I trying to add only new rows to an existing data 
> set that lives in S3 as gzipped parquet files, looping and appending for each 
> hour of the day. First, I create a DF from the existing data, then I use a 
> query to create another DF with the data that is new. Here is the code 
> snippet.
> 
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
> compression='gzip’)
> 
> I thought this would append new rows and keep the data unique, but I am see 
> many duplicates. Can someone help me with this and tell me what I am doing 
> wrong?
> 
> Thanks,
> Ben
> -- 
> Best Regards,
> Ayan Guha



Re: Append In-Place to S3

2018-06-02 Thread Benjamin Kim
Hi Jay,

Thanks for your response. Are you saying to append the new data and then
remove the duplicates to the whole data set afterwards overwriting the
existing data set with new data set with appended values? I will give that
a try.

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:

> Benjamin,
>
> The append will append the "new" data to the existing data with removing
> the duplicates. You would need to overwrite the file everytime if you need
> unique values.
>
> Thanks,
> Jayadeep
>
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>
>> I have a situation where I trying to add only new rows to an existing
>> data set that lives in S3 as gzipped parquet files, looping and appending
>> for each hour of the day. First, I create a DF from the existing data, then
>> I use a query to create another DF with the data that is new. Here is the
>> code snippet.
>>
>> df = spark.read.parquet(existing_data_path)
>> df.createOrReplaceTempView(‘existing_data’)
>> new_df = spark.read.parquet(new_data_path)
>> new_df.createOrReplaceTempView(’new_data’)
>> append_df = spark.sql(
>> """
>> WITH ids AS (
>> SELECT DISTINCT
>> source,
>> source_id,
>> target,
>> target_id
>> FROM new_data i
>> LEFT ANTI JOIN existing_data im
>> ON i.source = im.source
>> AND i.source_id = im.source_id
>> AND i.target = im.target
>> AND i.target = im.target_id
>> """
>> )
>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>> compression='gzip’)
>>
>>
>> I thought this would append new rows and keep the data unique, but I am
>> see many duplicates. Can someone help me with this and tell me what I am
>> doing wrong?
>>
>> Thanks,
>> Ben
>>
>


Append In-Place to S3

2018-06-01 Thread Benjamin Kim
I have a situation where I trying to add only new rows to an existing data set 
that lives in S3 as gzipped parquet files, looping and appending for each hour 
of the day. First, I create a DF from the existing data, then I use a query to 
create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
"""
WITH ids AS (
SELECT DISTINCT
source,
source_id,
target,
target_id
FROM new_data i
LEFT ANTI JOIN existing_data im
ON i.source = im.source
AND i.source_id = im.source_id
AND i.target = im.target
AND i.target = im.target_id
"""
)
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see 
many duplicates. Can someone help me with this and tell me what I am doing 
wrong?

Thanks,
Ben

Re: Spark 2.2 Structured Streaming + Kinesis

2017-11-13 Thread Benjamin Kim
To add, we have a CDH 5.12 cluster with Spark 2.2 in our data center.

On Mon, Nov 13, 2017 at 3:15 PM Benjamin Kim <bbuil...@gmail.com> wrote:

> Does anyone know if there is a connector for AWS Kinesis that can be used
> as a source for Structured Streaming?
>
> Thanks.
>
>


Databricks Serverless

2017-11-13 Thread Benjamin Kim
I have a question about this. The documentation compares the concept
similar to BigQuery. Does this mean that we will no longer need to deal
with instances and just pay for execution duration and amount of data
processed? I’m just curious about how this will be priced.

Also, when will it be ready for production?

Cheers.


Spark 2.2 Structured Streaming + Kinesis

2017-11-13 Thread Benjamin Kim
Does anyone know if there is a connector for AWS Kinesis that can be used
as a source for Structured Streaming?

Thanks.


Serverless ETL

2017-10-17 Thread Benjamin Kim
With AWS having Glue and GCE having Dataprep, is Databricks coming out with
an equivalent or better? I know that Serverless is a new offering, but will
it go farther with automatic data schema discovery, profiling, metadata
storage, change triggering, joining, transform suggestions, etc.?

Just curious.

Cheers,
Ben


Glue-like Functionality

2017-07-08 Thread Benjamin Kim
Has anyone seen AWS Glue? I was wondering if there is something similar going 
to be built into Spark Structured Streaming? I like the Data Catalog idea to 
store and track any data source/destination. It profiles the data to derive the 
scheme and data types. Also, it does some sort-of automated schema evolution 
when or if the schema changes. It leaves only the transformation logic to the 
ETL developer. I think some of this can enhance or simplify Structured 
Streaming. For example, AWS S3 can be catalogued as a Data Source; in 
Structured Streaming, Input DataFrame is created like a SQL view based off of 
the S3 Data Source; lastly, the Transform logic, if any, just manipulates the 
data going from the Input DataFrame to the Result DataFrame, which is another 
view based off of a catalogued Data Destination. This would relieve the ETL 
developer from caring about any Data Source or Destination. All server 
information, access credentials, data schemas, folder directory structures, 
file formats, and any other properties can be securely stored away with only a 
select few.

I'm just curious to know if anyone has thought the same thing.

Cheers,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Use SQL Script to Write Spark SQL Jobs

2017-06-12 Thread Benjamin Kim
Hi Bo,

+1 for your project. I come from the world of data warehouses, ETL, and 
reporting analytics. There are many individuals who do not know or want to do 
any coding. They are content with ANSI SQL and stick to it. ETL workflows are 
also done without any coding using a drag-and-drop user interface, such as 
Talend, SSIS, etc. There is a small amount of scripting involved but not too 
much. I looked at what you are trying to do, and I welcome it. This could open 
up Spark to the masses and shorten development times.

Cheers,
Ben


> On Jun 12, 2017, at 10:14 PM, bo yang  wrote:
> 
> Hi Aakash,
> 
> Thanks for your willing to help :) It will be great if I could get more 
> feedback on my project. For example, is there any other people feeling the 
> need of using a script to write Spark job easily? Also, I would explore 
> whether it is possible that the Spark project takes some work to build such a 
> script based high level DSL.
> 
> Best,
> Bo
> 
> 
> On Mon, Jun 12, 2017 at 12:14 PM, Aakash Basu  > wrote:
> Hey,
> 
> I work on Spark SQL and would pretty much be able to help you in this. Let me 
> know your requirement.
> 
> Thanks,
> Aakash.
> 
> On 12-Jun-2017 11:00 AM, "bo yang"  > wrote:
> Hi Guys,
> 
> I am writing a small open source project 
>  to use SQL Script to write Spark 
> Jobs. Want to see if there are other people interested to use or contribute 
> to this project.
> 
> The project is called UberScriptQuery 
> (https://github.com/uber/uberscriptquery 
> ). Sorry for the dumb name to avoid 
> conflict with many other names (Spark is registered trademark, thus I could 
> not use Spark in my project name).
> 
> In short, it is a high level SQL-like DSL (Domain Specific Language) on top 
> of Spark. People can use that DSL to write Spark jobs without worrying about 
> Spark internal details. Please check README 
>  in the project to get more details.
> 
> It will be great if I could get any feedback or suggestions!
> 
> Best,
> Bo
> 
> 



Spark 2.1 and Hive Metastore

2017-04-09 Thread Benjamin Kim
I’m curious about if and when Spark SQL will ever remove its dependency on Hive 
Metastore. Now that Spark 2.1’s SparkSession has superseded the need for 
HiveContext, are there plans for Spark to no longer use the Hive Metastore 
service with a “SparkSchema” service with a PostgreSQL, MySQL, etc. DB backend? 
Hive is growing long in the tooth, and it would be nice to retire it someday.

Cheers,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Get S3 Parquet File

2017-02-24 Thread Benjamin Kim
Gourav,

I’ll start experimenting with Spark 2.1 to see if this works.

Cheers,
Ben


> On Feb 24, 2017, at 5:46 AM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> Hi Benjamin,
> 
> First of all fetching data from S3 while writing a code in on premise system 
> is a very bad idea. You might want to first copy the data in to local HDFS 
> before running your code. Ofcourse this depends on the volume of data and 
> internet speed that you have.
> 
> The platform which makes your data at least 10 times faster is SPARK 2.1. And 
> trust me you do not want to be writing code which needs you to update it once 
> again in 6 months because newer versions of SPARK now find it deprecated.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Fri, Feb 24, 2017 at 7:18 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Gourav,
> 
> My answers are below.
> 
> Cheers,
> Ben
> 
> 
>> On Feb 23, 2017, at 10:57 PM, Gourav Sengupta <gourav.sengu...@gmail.com 
>> <mailto:gourav.sengu...@gmail.com>> wrote:
>> 
>> Can I ask where are you running your CDH? Is it on premise or have you 
>> created a cluster for yourself in AWS? Our cluster in on premise in our data 
>> center.
>> 
>> Also I have really never seen use s3a before, that was used way long before 
>> when writing s3 files took a long time, but I think that you are reading it. 
>> 
>> Anyideas why you are not migrating to Spark 2.1, besides speed, there are 
>> lots of apis which are new and the existing ones are being deprecated. 
>> Therefore there is a very high chance that you are already working on code 
>> which is being deprecated by the SPARK community right now. We use CDH and 
>> upgrade with whatever Spark version they include, which is 1.6.0. We are 
>> waiting for the move to Spark 2.0/2.1.
>> 
>> And besides that would you not want to work on a platform which is at least 
>> 10 times faster What would that be?
>> 
>> Regards,
>> Gourav Sengupta
>> 
>> On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
>> file from AWS S3. We can read the schema and show some data when the file is 
>> loaded into a DataFrame, but when we try to do some operations, such as 
>> count, we get this error below.
>> 
>> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
>> credentials from any provider in the chain
>> at 
>> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>> at 
>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
>> at 
>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
>> at 
>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
>> at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
>> at 
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
>> at 
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>> at 
>> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
>> at 
>> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
>> at 
>> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
>> at 
>> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
>> at 
>> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>

Re: Get S3 Parquet File

2017-02-23 Thread Benjamin Kim
Hi Gourav,

My answers are below.

Cheers,
Ben


> On Feb 23, 2017, at 10:57 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> Can I ask where are you running your CDH? Is it on premise or have you 
> created a cluster for yourself in AWS? Our cluster in on premise in our data 
> center.
> 
> Also I have really never seen use s3a before, that was used way long before 
> when writing s3 files took a long time, but I think that you are reading it. 
> 
> Anyideas why you are not migrating to Spark 2.1, besides speed, there are 
> lots of apis which are new and the existing ones are being deprecated. 
> Therefore there is a very high chance that you are already working on code 
> which is being deprecated by the SPARK community right now. We use CDH and 
> upgrade with whatever Spark version they include, which is 1.6.0. We are 
> waiting for the move to Spark 2.0/2.1.
> 
> And besides that would you not want to work on a platform which is at least 
> 10 times faster What would that be?
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
> file from AWS S3. We can read the schema and show some data when the file is 
> loaded into a DataFrame, but when we try to do some operations, such as 
> count, we get this error below.
> 
> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
> credentials from any provider in the chain
> at 
> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
> at 
> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
> at 
> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> Can anyone help?
> 
> Cheers,
> Ben
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Re: Get S3 Parquet File

2017-02-23 Thread Benjamin Kim
Aakash,

Here is a code snippet for the keys.

val accessKey = “---"
val secretKey = “---"

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)
hadoopConf.set("spark.hadoop.fs.s3a.access.key",accessKey)
hadoopConf.set("spark.hadoop.fs.s3a.secret.key",secretKey)

val df = 
sqlContext.read.parquet("s3a://aps.optus/uc2/BI_URL_DATA_HLY_20170201_09.PARQUET.gz")
df.show
df.count

When we do the count, then the error happens.

Thanks,
Ben


> On Feb 23, 2017, at 10:31 AM, Aakash Basu <aakash.spark@gmail.com> wrote:
> 
> Hey,
> 
> Please recheck your access key and secret key being used to fetch the parquet 
> file. It seems to be a credential error. Either mismatch/load. If load, then 
> first use it directly in code and see if the issue resolves, then it can be 
> hidden and read from Input Params.
> 
> Thanks,
> Aakash.
> 
> 
> On 23-Feb-2017 11:54 PM, "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
> file from AWS S3. We can read the schema and show some data when the file is 
> loaded into a DataFrame, but when we try to do some operations, such as 
> count, we get this error below.
> 
> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
> credentials from any provider in the chain
> at 
> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
> at 
> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at 
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
> at 
> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
> at 
> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> Can anyone help?
> 
> Cheers,
> Ben
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Get S3 Parquet File

2017-02-23 Thread Benjamin Kim
We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB Parquet 
file from AWS S3. We can read the schema and show some data when the file is 
loaded into a DataFrame, but when we try to do some operations, such as count, 
we get this error below.

com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
credentials from any provider in the chain
at 
com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
at 
com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2730)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
at 
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
at 
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:145)
at 
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
at 
org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Can anyone help?

Cheers,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Parquet Gzipped Files

2017-02-14 Thread Benjamin Kim
Jörn,

I agree with you, but the vendor is a little difficult to work with. For now, I 
will try to decompress it from S3 and save it plainly into HDFS. If someone 
already has this example, please let me know.

Cheers,
Ben


> On Feb 13, 2017, at 9:50 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Your vendor should use the parquet internal compression and not take a 
> parquet file and gzip it.
> 
>> On 13 Feb 2017, at 18:48, Benjamin Kim <bbuil...@gmail.com> wrote:
>> 
>> We are receiving files from an outside vendor who creates a Parquet data 
>> file and Gzips it before delivery. Does anyone know how to Gunzip the file 
>> in Spark and inject the Parquet data into a DataFrame? I thought using 
>> sc.textFile or sc.wholeTextFiles would automatically Gunzip the file, but 
>> I’m getting a decompression header error when trying to open the Parquet 
>> file.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Parquet Gzipped Files

2017-02-13 Thread Benjamin Kim
We are receiving files from an outside vendor who creates a Parquet data file 
and Gzips it before delivery. Does anyone know how to Gunzip the file in Spark 
and inject the Parquet data into a DataFrame? I thought using sc.textFile or 
sc.wholeTextFiles would automatically Gunzip the file, but I’m getting a 
decompression header error when trying to open the Parquet file.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Remove dependence on HDFS

2017-02-11 Thread Benjamin Kim
Has anyone got some advice on how to remove the reliance on HDFS for storing 
persistent data. We have an on-premise Spark cluster. It seems like a waste of 
resources to keep adding nodes because of a lack of storage space only. I would 
rather add more powerful nodes due to the lack of processing power at a less 
frequent rate, than add less powerful nodes at a more frequent rate just to 
handle the ever growing data. Can anyone point me in the right direction? Is 
Alluxio a good solution? S3? I would like to hear your thoughts.

Cheers,
Ben 
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
Asher,

I found a profile for Spark 2.11 and removed it. Now, it brings in 2.10. I ran 
some code and got further. Now, I get this error below when I do a “df.show”.

java.lang.AbstractMethodError
at org.apache.spark.Logging$class.log(Logging.scala:50)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseFilter$.log(HBaseFilter.scala:122)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseFilter$.buildFilters(HBaseFilter.scala:125)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.getPartitions(HBaseTableScan.scala:59)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)

Thanks for all your help.

Cheers,
Ben


> On Feb 3, 2017, at 8:16 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Did you check the actual maven dep tree? Something might be pulling in a 
> different version. Also, if you're seeing this locally, you might want to 
> check which version of the scala sdk your IDE is using
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Asher,
> 
> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java 
> (1.8) version as our installation. The Scala (2.10.5) version is already the 
> same as ours. But I’m still getting the same error. Can you think of anything 
> else?
> 
> Cheers,
> Ben
> 
> 
>> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com 
>> <mailto:ak...@hubspot.com>> wrote:
>> 
>> Ben,
>> 
>> That looks like a scala version mismatch. Have you checked your dep tree?
>> 
>> Asher Krim
>> Senior Software Engineer
>> 
>> 
>> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Elek,
>> 
>> Can you give me some sample code? I can’t get mine to work.
>> 
>> import org.apache.spark.sql.{SQLContext, _}
>> import org.apache.spark.sql.execution.datasources.hbase._
>> import org.apache.spark.{SparkConf, SparkContext}
>> 
>> def cat = s"""{
>> |"table":{"namespace":"ben", "name":"dmp_test", 
>> "tableCoder":"PrimitiveType"},
>> |"rowkey":"key",
>> |"columns":{
>> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
>> |}
>> |}""".stripMargin
>> 
>> import sqlContext.implicits._
>> 
>> def withCatalog(cat: String): DataFrame = {
>> sqlContext
>> .read
>> .options(Map(HBaseTableCatalog.tableCatalog->cat))
>> .format("org.apache.spark.sql.execution.datasources.hbase")
>> .load()
>> }
>> 
>> val df = withCatalog(cat)
>> df.show
>> 
>> It gives me this error.
>> 
>> java.lang.NoSuchMethodError: 
>> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>  at 
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>> 
>> If you can please help, I would be grateful.
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>>> <mailto:h...@anzix.net>> wrote:
>>> 
>>> 
>>> I tested this one with hbase 1.2.4:
>>> 
>>> https://github.com/hortonworks-spark/shc 
>>> <https://github.com/hortonworks-spark/shc>
>>> 
>>> Marton
>>> 
>>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>>> tried to build it from source, but I cannot get it to work.
>>>> 
>>>> Thanks,
>>>> Ben
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
>> 
> 
> 



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
Asher,

You’re right. I don’t see anything but 2.11 being pulled in. Do you know where 
I can change this?

Cheers,
Ben


> On Feb 3, 2017, at 10:50 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Sorry for my persistence, but did you actually run "mvn dependency:tree 
> -Dverbose=true"? And did you see only scala 2.10.5 being pulled in?
> 
> On Fri, Feb 3, 2017 at 12:33 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Asher,
> 
> It’s still the same. Do you have any other ideas?
> 
> Cheers,
> Ben
> 
> 
>> On Feb 3, 2017, at 8:16 AM, Asher Krim <ak...@hubspot.com 
>> <mailto:ak...@hubspot.com>> wrote:
>> 
>> Did you check the actual maven dep tree? Something might be pulling in a 
>> different version. Also, if you're seeing this locally, you might want to 
>> check which version of the scala sdk your IDE is using
>> 
>> Asher Krim
>> Senior Software Engineer
>> 
>> 
>> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Hi Asher,
>> 
>> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java 
>> (1.8) version as our installation. The Scala (2.10.5) version is already the 
>> same as ours. But I’m still getting the same error. Can you think of 
>> anything else?
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com 
>>> <mailto:ak...@hubspot.com>> wrote:
>>> 
>>> Ben,
>>> 
>>> That looks like a scala version mismatch. Have you checked your dep tree?
>>> 
>>> Asher Krim
>>> Senior Software Engineer
>>> 
>>> 
>>> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> Elek,
>>> 
>>> Can you give me some sample code? I can’t get mine to work.
>>> 
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> 
>>> def cat = s"""{
>>> |"table":{"namespace":"ben", "name":"dmp_test", 
>>> "tableCoder":"PrimitiveType"},
>>> |"rowkey":"key",
>>> |"columns":{
>>> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>>> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
>>> |}
>>> |}""".stripMargin
>>> 
>>> import sqlContext.implicits._
>>> 
>>> def withCatalog(cat: String): DataFrame = {
>>> sqlContext
>>> .read
>>> .options(Map(HBaseTableCatalog.tableCatalog->cat))
>>> .format("org.apache.spark.sql.execution.datasources.hbase")
>>> .load()
>>> }
>>> 
>>> val df = withCatalog(cat)
>>> df.show
>>> 
>>> It gives me this error.
>>> 
>>> java.lang.NoSuchMethodError: 
>>> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>>> at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>>> at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>>> at 
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>> at 
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>>> 
>>> If you can please help, I would be grateful.
>>> 
>>> Cheers,
>>> Ben
>>> 
>>> 
>>>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>>>> <mailto:h...@anzix.net>> wrote:
>>>> 
>>>> 
>>>> I tested this one with hbase 1.2.4:
>>>> 
>>>> https://github.com/hortonworks-spark/shc 
>>>> <https://github.com/hortonworks-spark/shc>
>>>> 
>>>> Marton
>>>> 
>>>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>>>> tried to build it from source, but I cannot get it to work.
>>>>> 
>>>>> Thanks,
>>>>> Ben
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>>> 
>>>> 
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
Asher,

It’s still the same. Do you have any other ideas?

Cheers,
Ben


> On Feb 3, 2017, at 8:16 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Did you check the actual maven dep tree? Something might be pulling in a 
> different version. Also, if you're seeing this locally, you might want to 
> check which version of the scala sdk your IDE is using
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Asher,
> 
> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java 
> (1.8) version as our installation. The Scala (2.10.5) version is already the 
> same as ours. But I’m still getting the same error. Can you think of anything 
> else?
> 
> Cheers,
> Ben
> 
> 
>> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com 
>> <mailto:ak...@hubspot.com>> wrote:
>> 
>> Ben,
>> 
>> That looks like a scala version mismatch. Have you checked your dep tree?
>> 
>> Asher Krim
>> Senior Software Engineer
>> 
>> 
>> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Elek,
>> 
>> Can you give me some sample code? I can’t get mine to work.
>> 
>> import org.apache.spark.sql.{SQLContext, _}
>> import org.apache.spark.sql.execution.datasources.hbase._
>> import org.apache.spark.{SparkConf, SparkContext}
>> 
>> def cat = s"""{
>> |"table":{"namespace":"ben", "name":"dmp_test", 
>> "tableCoder":"PrimitiveType"},
>> |"rowkey":"key",
>> |"columns":{
>> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
>> |}
>> |}""".stripMargin
>> 
>> import sqlContext.implicits._
>> 
>> def withCatalog(cat: String): DataFrame = {
>> sqlContext
>> .read
>> .options(Map(HBaseTableCatalog.tableCatalog->cat))
>> .format("org.apache.spark.sql.execution.datasources.hbase")
>> .load()
>> }
>> 
>> val df = withCatalog(cat)
>> df.show
>> 
>> It gives me this error.
>> 
>> java.lang.NoSuchMethodError: 
>> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>>  at 
>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>  at 
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>> 
>> If you can please help, I would be grateful.
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>>> <mailto:h...@anzix.net>> wrote:
>>> 
>>> 
>>> I tested this one with hbase 1.2.4:
>>> 
>>> https://github.com/hortonworks-spark/shc 
>>> <https://github.com/hortonworks-spark/shc>
>>> 
>>> Marton
>>> 
>>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>>> tried to build it from source, but I cannot get it to work.
>>>> 
>>>> Thanks,
>>>> Ben
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
>> 
> 
> 



Re: HBase Spark

2017-02-03 Thread Benjamin Kim
I'll clean up any .m2 or .ivy directories. And try again.

I ran this on our lab cluster for testing.

Cheers,
Ben


On Fri, Feb 3, 2017 at 8:16 AM Asher Krim <ak...@hubspot.com> wrote:

> Did you check the actual maven dep tree? Something might be pulling in a
> different version. Also, if you're seeing this locally, you might want to
> check which version of the scala sdk your IDE is using
>
> Asher Krim
> Senior Software Engineer
>
> On Thu, Feb 2, 2017 at 5:43 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
>
> Hi Asher,
>
> I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java
> (1.8) version as our installation. The Scala (2.10.5) version is already
> the same as ours. But I’m still getting the same error. Can you think of
> anything else?
>
> Cheers,
> Ben
>
>
> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com> wrote:
>
> Ben,
>
> That looks like a scala version mismatch. Have you checked your dep tree?
>
> Asher Krim
> Senior Software Engineer
>
> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
>
> Elek,
>
> Can you give me some sample code? I can’t get mine to work.
>
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
>
> def cat = s"""{
> |"table":{"namespace":"ben", "name":"dmp_test",
> "tableCoder":"PrimitiveType"},
> |"rowkey":"key",
> |"columns":{
> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
> |}
> |}""".stripMargin
>
> import sqlContext.implicits._
>
> def withCatalog(cat: String): DataFrame = {
> sqlContext
> .read
> .options(Map(HBaseTableCatalog.tableCatalog->cat))
> .format("org.apache.spark.sql.execution.datasources.hbase")
> .load()
> }
>
> val df = withCatalog(cat)
> df.show
>
>
> It gives me this error.
>
> java.lang.NoSuchMethodError:
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
> at
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
> at
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
> at
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>
>
> If you can please help, I would be grateful.
>
> Cheers,
> Ben
>
>
> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net> wrote:
>
>
> I tested this one with hbase 1.2.4:
>
> https://github.com/hortonworks-spark/shc
>
> Marton
>
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>
> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I
> tried to build it from source, but I cannot get it to work.
>
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
>


Re: HBase Spark

2017-02-02 Thread Benjamin Kim
Hi Asher,

I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java (1.8) 
version as our installation. The Scala (2.10.5) version is already the same as 
ours. But I’m still getting the same error. Can you think of anything else?

Cheers,
Ben


> On Feb 2, 2017, at 11:06 AM, Asher Krim <ak...@hubspot.com> wrote:
> 
> Ben,
> 
> That looks like a scala version mismatch. Have you checked your dep tree?
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Elek,
> 
> Can you give me some sample code? I can’t get mine to work.
> 
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
> 
> def cat = s"""{
> |"table":{"namespace":"ben", "name":"dmp_test", 
> "tableCoder":"PrimitiveType"},
> |"rowkey":"key",
> |"columns":{
> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
> |}
> |}""".stripMargin
> 
> import sqlContext.implicits._
> 
> def withCatalog(cat: String): DataFrame = {
> sqlContext
> .read
> .options(Map(HBaseTableCatalog.tableCatalog->cat))
> .format("org.apache.spark.sql.execution.datasources.hbase")
> .load()
> }
> 
> val df = withCatalog(cat)
> df.show
> 
> It gives me this error.
> 
> java.lang.NoSuchMethodError: 
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>   at 
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
> 
> If you can please help, I would be grateful.
> 
> Cheers,
> Ben
> 
> 
>> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net 
>> <mailto:h...@anzix.net>> wrote:
>> 
>> 
>> I tested this one with hbase 1.2.4:
>> 
>> https://github.com/hortonworks-spark/shc 
>> <https://github.com/hortonworks-spark/shc>
>> 
>> Marton
>> 
>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>> tried to build it from source, but I cannot get it to work.
>>> 
>>> Thanks,
>>> Ben
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
> 
> 



Re: HBase Spark

2017-02-02 Thread Benjamin Kim
Elek,

Can you give me some sample code? I can’t get mine to work.

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}

def cat = s"""{
|"table":{"namespace":"ben", "name":"dmp_test", 
"tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"d", "col":"google_gid", "type":"string"}
|}
|}""".stripMargin

import sqlContext.implicits._

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

val df = withCatalog(cat)
df.show

It gives me this error.

java.lang.NoSuchMethodError: 
scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
at 
org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)

If you can please help, I would be grateful.

Cheers,
Ben


> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net> wrote:
> 
> 
> I tested this one with hbase 1.2.4:
> 
> https://github.com/hortonworks-spark/shc
> 
> Marton
> 
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>> tried to build it from source, but I cannot get it to work.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: HBase Spark

2017-01-31 Thread Benjamin Kim
Elek,

If I cannot use the HBase Spark module, then I’ll give it a try.

Thanks,
Ben


> On Jan 31, 2017, at 1:02 PM, Marton, Elek <h...@anzix.net> wrote:
> 
> 
> I tested this one with hbase 1.2.4:
> 
> https://github.com/hortonworks-spark/shc
> 
> Marton
> 
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>> tried to build it from source, but I cannot get it to work.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



HBase Spark

2017-01-31 Thread Benjamin Kim
Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I tried 
to build it from source, but I cannot get it to work.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merging Parquet Files

2016-12-22 Thread Benjamin Kim
Thanks, Hyukjin.

I’ll try using the Parquet tools for 1.9

On Dec 23, 2016, at 12:43 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

Hi Benjamin,


As you might already know, I believe the Hadoop command automatically does
not merge the column-based format such as ORC or Parquet but just simply
concatenates them.

I haven't tried this by myself but I remember I saw a JIRA in Parquet -
https://issues.apache.org/jira/browse/PARQUET-460

It seems parquet-tools allows merge small Parquet files into one.


Also, I believe there are command-line tools in Kite -
https://github.com/kite-sdk/kite

This might be useful.


Thanks!

2016-12-23 7:01 GMT+09:00 Benjamin Kim <bbuil...@gmail.com>:

Has anyone tried to merge *.gz.parquet files before? I'm trying to merge
them into 1 file after they are output from Spark. Doing a coalesce(1) on
the Spark cluster will not work. It just does not have the resources to do
it. I'm trying to do it using the commandline and not use Spark. I will use
this command in shell script. I tried "hdfs dfs -getmerge", but the file
becomes unreadable by Spark with gzip footer error.





Thanks,


Ben


-


To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Merging Parquet Files

2016-12-22 Thread Benjamin Kim
Thanks, Hyukjin.

I’ll try using the Parquet tools for 1.9 based on the jira. If that doesn’t 
work, I’ll try Kite.

Cheers,
Ben


> On Dec 23, 2016, at 12:43 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi Benjamin,
> 
> 
> As you might already know, I believe the Hadoop command automatically does 
> not merge the column-based format such as ORC or Parquet but just simply 
> concatenates them.
> 
> I haven't tried this by myself but I remember I saw a JIRA in Parquet - 
> https://issues.apache.org/jira/browse/PARQUET-460 
> <https://issues.apache.org/jira/browse/PARQUET-460>
> 
> It seems parquet-tools allows merge small Parquet files into one. 
> 
> 
> Also, I believe there are command-line tools in Kite - 
> https://github.com/kite-sdk/kite <https://github.com/kite-sdk/kite>
> 
> This might be useful.
> 
> 
> Thanks!
> 
> 2016-12-23 7:01 GMT+09:00 Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>>:
> Has anyone tried to merge *.gz.parquet files before? I'm trying to merge them 
> into 1 file after they are output from Spark. Doing a coalesce(1) on the 
> Spark cluster will not work. It just does not have the resources to do it. 
> I'm trying to do it using the commandline and not use Spark. I will use this 
> command in shell script. I tried "hdfs dfs -getmerge", but the file becomes 
> unreadable by Spark with gzip footer error.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Merging Parquet Files

2016-12-22 Thread Benjamin Kim
Has anyone tried to merge *.gz.parquet files before? I'm trying to merge them 
into 1 file after they are output from Spark. Doing a coalesce(1) on the Spark 
cluster will not work. It just does not have the resources to do it. I'm trying 
to do it using the commandline and not use Spark. I will use this command in 
shell script. I tried "hdfs dfs -getmerge", but the file becomes unreadable by 
Spark with gzip footer error.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Deep learning libraries for scala

2016-11-01 Thread Benjamin Kim
To add, I see that Databricks has been busy integrating deep learning more into 
their product and put out a new article about this.

https://databricks.com/blog/2016/10/27/gpu-acceleration-in-databricks.html 
<https://databricks.com/blog/2016/10/27/gpu-acceleration-in-databricks.html>

An interesting tidbit is at the bottom of the article mentioning TensorFrames.

https://github.com/databricks/tensorframes 
<https://github.com/databricks/tensorframes>

Seems like an interesting direction…

Cheers,
Ben


> On Oct 19, 2016, at 9:05 AM, janardhan shetty <janardhan...@gmail.com> wrote:
> 
> Agreed. But as it states deeper integration with (scala) is yet to be 
> developed. 
> Any thoughts on how to use tensorflow with scala ? Need to write wrappers I 
> think.
> 
> 
> On Oct 19, 2016 7:56 AM, "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> On that note, here is an article that Databricks made regarding using 
> Tensorflow in conjunction with Spark.
> 
> https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html
>  
> <https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html>
> 
> Cheers,
> Ben
> 
> 
>> On Oct 19, 2016, at 3:09 AM, Gourav Sengupta <gourav.sengu...@gmail.com 
>> <mailto:gourav.sengu...@gmail.com>> wrote:
>> 
>> while using Deep Learning you might want to stay as close to tensorflow as 
>> possible. There is very less translation loss, you get to access stable, 
>> scalable and tested libraries from the best brains in the industry and as 
>> far as Scala goes, it helps a lot to think about using the language as a 
>> tool to access algorithms in this instance unless you want to start 
>> developing algorithms from grounds up ( and in which case you might not 
>> require any libraries at all).
>> 
>> On Sat, Oct 1, 2016 at 3:30 AM, janardhan shetty <janardhan...@gmail.com 
>> <mailto:janardhan...@gmail.com>> wrote:
>> Hi,
>> 
>> Are there any good libraries which can be used for scala deep learning 
>> models ?
>> How can we integrate tensorflow with scala ML ?
>> 
> 



Spark Streaming and Kinesis

2016-10-27 Thread Benjamin Kim
Has anyone worked with AWS Kinesis and retrieved data from it using Spark 
Streaming? I am having issues where it’s returning no data. I can connect to 
the Kinesis stream and describe using Spark. Is there something I’m missing? 
Are there specific IAM security settings needed? I just simply followed the 
Word Count ASL example. When it didn’t work, I even tried to run the code 
independently in Spark shell in yarn-client mode by hardcoding the arguments. 
Still, there was no data even with the setting InitialPositionInStream.LATEST 
changed to InitialPositionInStream.TRIM_HORIZON.

If anyone can help, I would truly appreciate it.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Deep learning libraries for scala

2016-10-19 Thread Benjamin Kim
On that note, here is an article that Databricks made regarding using 
Tensorflow in conjunction with Spark.

https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html

Cheers,
Ben


> On Oct 19, 2016, at 3:09 AM, Gourav Sengupta  
> wrote:
> 
> while using Deep Learning you might want to stay as close to tensorflow as 
> possible. There is very less translation loss, you get to access stable, 
> scalable and tested libraries from the best brains in the industry and as far 
> as Scala goes, it helps a lot to think about using the language as a tool to 
> access algorithms in this instance unless you want to start developing 
> algorithms from grounds up ( and in which case you might not require any 
> libraries at all).
> 
> On Sat, Oct 1, 2016 at 3:30 AM, janardhan shetty  > wrote:
> Hi,
> 
> Are there any good libraries which can be used for scala deep learning models 
> ?
> How can we integrate tensorflow with scala ML ?
> 



Re: Spark SQL Thriftserver with HBase

2016-10-17 Thread Benjamin Kim
This will give me an opportunity to start using Structured Streaming. Then, I 
can try adding more functionality. If all goes well, then we could transition 
off of HBase to a more in-memory data solution that can “spill-over” data for 
us.

> On Oct 17, 2016, at 11:53 AM, vincent gromakowski 
> <vincent.gromakow...@gmail.com> wrote:
> 
> Instead of (or additionally to) saving results somewhere, you just start a 
> thriftserver that expose the Spark tables of the SQLContext (or SparkSession 
> now). That means you can implement any logic (and maybe use structured 
> streaming) to expose your data. Today using the thriftserver means reading 
> data from the persistent store every query, so if the data modeling doesn't 
> fit the query it can be quite long.  What you generally do in a common spark 
> job is to load the data and cache spark table in a in-memory columnar table 
> which is quite efficient for any kind of query, the counterpart is that the 
> cache isn't updated you have to implement a reload mechanism, and this 
> solution isn't available using the thriftserver.
> What I propose is to mix the two world: periodically/delta load data in spark 
> table cache and expose it through the thriftserver. But you have to implement 
> the loading logic, it can be very simple to very complex depending on your 
> needs.
> 
> 
> 2016-10-17 19:48 GMT+02:00 Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>>:
> Is this technique similar to what Kinesis is offering or what Structured 
> Streaming is going to have eventually?
> 
> Just curious.
> 
> Cheers,
> Ben
> 
>  
>> On Oct 17, 2016, at 10:14 AM, vincent gromakowski 
>> <vincent.gromakow...@gmail.com <mailto:vincent.gromakow...@gmail.com>> wrote:
>> 
>> I would suggest to code your own Spark thriftserver which seems to be very 
>> easy.
>> http://stackoverflow.com/questions/27108863/accessing-spark-sql-rdd-tables-through-the-thrift-server
>>  
>> <http://stackoverflow.com/questions/27108863/accessing-spark-sql-rdd-tables-through-the-thrift-server>
>> 
>> I am starting to test it. The big advantage is that you can implement any 
>> logic because it's a spark job and then start a thrift server on temporary 
>> table. For example you can query a micro batch rdd from a kafka stream, or 
>> pre load some tables and implement a rolling cache to periodically update 
>> the spark in memory tables with persistent store...
>> It's not part of the public API and I don't know yet what are the issues 
>> doing this but I think Spark community should look at this path: making the 
>> thriftserver be instantiable in any spark job.
>> 
>> 2016-10-17 18:17 GMT+02:00 Michael Segel <msegel_had...@hotmail.com 
>> <mailto:msegel_had...@hotmail.com>>:
>> Guys, 
>> Sorry for jumping in late to the game… 
>> 
>> If memory serves (which may not be a good thing…) :
>> 
>> You can use HiveServer2 as a connection point to HBase.  
>> While this doesn’t perform well, its probably the cleanest solution. 
>> I’m not keen on Phoenix… wouldn’t recommend it…. 
>> 
>> 
>> The issue is that you’re trying to make HBase, a key/value object store, a 
>> Relational Engine… its not. 
>> 
>> There are some considerations which make HBase not ideal for all use cases 
>> and you may find better performance with Parquet files. 
>> 
>> One thing missing is the use of secondary indexing and query optimizations 
>> that you have in RDBMSs and are lacking in HBase / MapRDB / etc …  so your 
>> performance will vary. 
>> 
>> With respect to Tableau… their entire interface in to the big data world 
>> revolves around the JDBC/ODBC interface. So if you don’t have that piece as 
>> part of your solution, you’re DOA w respect to Tableau. 
>> 
>> Have you considered Drill as your JDBC connection point?  (YAAP: Yet another 
>> Apache project) 
>> 
>> 
>>> On Oct 9, 2016, at 12:23 PM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>> Thanks for all the suggestions. It would seem you guys are right about the 
>>> Tableau side of things. The reports don’t need to be real-time, and they 
>>> won’t be directly feeding off of the main DMP HBase data. Instead, it’ll be 
>>> batched to Parquet or Kudu/Impala or even PostgreSQL.
>>> 
>>> I originally thought that we needed two-way data retrieval from the DMP 
>>> HBase for ID generation, but after further investigation into the use-case 
>>> and architecture, the ID generation needs to happen 

Re: Spark SQL Thriftserver with HBase

2016-10-17 Thread Benjamin Kim
Is this technique similar to what Kinesis is offering or what Structured 
Streaming is going to have eventually?

Just curious.

Cheers,
Ben

 
> On Oct 17, 2016, at 10:14 AM, vincent gromakowski 
> <vincent.gromakow...@gmail.com> wrote:
> 
> I would suggest to code your own Spark thriftserver which seems to be very 
> easy.
> http://stackoverflow.com/questions/27108863/accessing-spark-sql-rdd-tables-through-the-thrift-server
>  
> <http://stackoverflow.com/questions/27108863/accessing-spark-sql-rdd-tables-through-the-thrift-server>
> 
> I am starting to test it. The big advantage is that you can implement any 
> logic because it's a spark job and then start a thrift server on temporary 
> table. For example you can query a micro batch rdd from a kafka stream, or 
> pre load some tables and implement a rolling cache to periodically update the 
> spark in memory tables with persistent store...
> It's not part of the public API and I don't know yet what are the issues 
> doing this but I think Spark community should look at this path: making the 
> thriftserver be instantiable in any spark job.
> 
> 2016-10-17 18:17 GMT+02:00 Michael Segel <msegel_had...@hotmail.com 
> <mailto:msegel_had...@hotmail.com>>:
> Guys, 
> Sorry for jumping in late to the game… 
> 
> If memory serves (which may not be a good thing…) :
> 
> You can use HiveServer2 as a connection point to HBase.  
> While this doesn’t perform well, its probably the cleanest solution. 
> I’m not keen on Phoenix… wouldn’t recommend it…. 
> 
> 
> The issue is that you’re trying to make HBase, a key/value object store, a 
> Relational Engine… its not. 
> 
> There are some considerations which make HBase not ideal for all use cases 
> and you may find better performance with Parquet files. 
> 
> One thing missing is the use of secondary indexing and query optimizations 
> that you have in RDBMSs and are lacking in HBase / MapRDB / etc …  so your 
> performance will vary. 
> 
> With respect to Tableau… their entire interface in to the big data world 
> revolves around the JDBC/ODBC interface. So if you don’t have that piece as 
> part of your solution, you’re DOA w respect to Tableau. 
> 
> Have you considered Drill as your JDBC connection point?  (YAAP: Yet another 
> Apache project) 
> 
> 
>> On Oct 9, 2016, at 12:23 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> Thanks for all the suggestions. It would seem you guys are right about the 
>> Tableau side of things. The reports don’t need to be real-time, and they 
>> won’t be directly feeding off of the main DMP HBase data. Instead, it’ll be 
>> batched to Parquet or Kudu/Impala or even PostgreSQL.
>> 
>> I originally thought that we needed two-way data retrieval from the DMP 
>> HBase for ID generation, but after further investigation into the use-case 
>> and architecture, the ID generation needs to happen local to the Ad Servers 
>> where we generate a unique ID and store it in a ID linking table. Even 
>> better, many of the 3rd party services supply this ID. So, data only needs 
>> to flow in one direction. We will use Kafka as the bus for this. No JDBC 
>> required. This is also goes for the REST Endpoints. 3rd party services will 
>> hit ours to update our data with no need to read from our data. And, when we 
>> want to update their data, we will hit theirs to update their data using a 
>> triggered job.
>> 
>> This al boils down to just integrating with Kafka.
>> 
>> Once again, thanks for all the help.
>> 
>> Cheers,
>> Ben
>> 
>> 
>>> On Oct 9, 2016, at 3:16 AM, Jörn Franke <jornfra...@gmail.com 
>>> <mailto:jornfra...@gmail.com>> wrote:
>>> 
>>> please keep also in mind that Tableau Server has the capabilities to store 
>>> data in-memory and refresh only when needed the in-memory data. This means 
>>> you can import it from any source and let your users work only on the 
>>> in-memory data in Tableau Server.
>>> 
>>> On Sun, Oct 9, 2016 at 9:22 AM, Jörn Franke <jornfra...@gmail.com 
>>> <mailto:jornfra...@gmail.com>> wrote:
>>> Cloudera 5.8 has a very old version of Hive without Tez, but Mich provided 
>>> already a good alternative. However, you should check if it contains a 
>>> recent version of Hbase and Phoenix. That being said, I just wonder what is 
>>> the dataflow, data model and the analysis you plan to do. Maybe there are 
>>> completely different solutions possible. Especially these single inserts, 
>>> upserts etc. should be avoided

Re: Inserting New Primary Keys

2016-10-10 Thread Benjamin Kim
Jean,

I see your point. For the incremental data, which is very small, I should make 
sure that the PARTITION BY in the OVER(PARTITION BY ...) is left out so that 
all the data will be in one partition when assigned a row number. The query 
below should avoid any problems.

“SELECT ROW_NUMBER() OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN 
(SELECT COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”.

But initially, I’ll use the monotonicallyIncreasingId function when I first 
load the data.

Thanks,
Ben


> On Oct 10, 2016, at 8:36 AM, Jean Georges Perrin <j...@jgp.net> wrote:
> 
> Is there only one process adding rows? because this seems a little risky if 
> you have multiple threads doing that… 
> 
>> On Oct 8, 2016, at 1:43 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> Mich,
>> 
>> After much searching, I found and am trying to use “SELECT ROW_NUMBER() 
>> OVER() + b.id_max AS id, a.* FROM source a CROSS JOIN (SELECT 
>> COALESCE(MAX(id),0) AS id_max FROM tmp_destination) b”. I think this should 
>> do it.
>> 
>> Thanks,
>> Ben
>> 
>> 
>>> On Oct 8, 2016, at 9:48 AM, Mich Talebzadeh <mich.talebza...@gmail.com 
>>> <mailto:mich.talebza...@gmail.com>> wrote:
>>> 
>>> can you get the max value from the current  table and start from MAX(ID) + 
>>> 1 assuming it is a numeric value (it should be)?
>>> 
>>> HTH
>>> 
>>> HTH
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>>  
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> On 8 October 2016 at 17:42, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> I have a table with data already in it that has primary keys generated by 
>>> the function monotonicallyIncreasingId. Now, I want to insert more data 
>>> into it with primary keys that will auto-increment from where the existing 
>>> data left off. How would I do this? There is no argument I can pass into 
>>> the function monotonicallyIncreasingId to seed it.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> 
>>> 
>> 
> 



Inserting New Primary Keys

2016-10-08 Thread Benjamin Kim
I have a table with data already in it that has primary keys generated by the 
function monotonicallyIncreasingId. Now, I want to insert more data into it 
with primary keys that will auto-increment from where the existing data left 
off. How would I do this? There is no argument I can pass into the function 
monotonicallyIncreasingId to seed it.

Thanks,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Loading data into Hbase table throws NoClassDefFoundError: org/apache/htrace/Trace error

2016-10-01 Thread Benjamin Kim
Mich,

I know up until CDH 5.4 we had to add the HTrace jar to the classpath to make 
it work using the command below. But after upgrading to CDH 5.7, it became 
unnecessary.

echo "/opt/cloudera/parcels/CDH/jars/htrace-core-3.2.0-incubating.jar" >> 
/etc/spark/conf/classpath.txt

Hope this helps.

Cheers,
Ben


> On Oct 1, 2016, at 3:22 PM, Mich Talebzadeh  wrote:
> 
> Trying bulk load using Hfiles in Spark as below example:
> 
> import org.apache.spark._
> import org.apache.spark.rdd.NewHadoopRDD
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat
> import org.apache.hadoop.mapred.JobConf
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
> import org.apache.hadoop.mapreduce.Job
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
> import org.apache.hadoop.hbase.KeyValue
> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
> import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
> 
> So far no issues.
> 
> Then I do
> 
> val conf = HBaseConfiguration.create()
> conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, 
> core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, 
> yarn-site.xml, hbase-default.xml, hbase-site.xml
> val tableName = "testTable"
> tableName: String = testTable
> 
> But this one fails:
> 
> scala> val table = new HTable(conf, tableName)
> java.io.IOException: java.lang.reflect.InvocationTargetException
>   at 
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:431)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:424)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager.getConnectionInternal(ConnectionManager.java:302)
>   at org.apache.hadoop.hbase.client.HTable.(HTable.java:185)
>   at org.apache.hadoop.hbase.client.HTable.(HTable.java:151)
>   ... 52 elided
> Caused by: java.lang.reflect.InvocationTargetException: 
> java.lang.NoClassDefFoundError: org/apache/htrace/Trace
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
>   ... 57 more
> Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace
>   at 
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:216)
>   at org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:419)
>   at 
> org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
>   at 
> org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:105)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:905)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.(ConnectionManager.java:648)
>   ... 62 more
> Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace
> 
> I have got all the jar files in spark-defaults.conf
> 
> spark.driver.extraClassPath  
> /home/hduser/jars/ojdbc6.jar:/home/hduser/jars/jconn4.jar:/home/hduser/jars/hbase-client-1.2.3.jar:/home/hduser/jars/hbase-server-1.2.3.jar:/home/hduser/jars/hbase-common-1.2.3.jar:/home/hduser/jars/hbase-protocol-1.2.3.jar:/home/hduser/jars/htrace-core-3.0.4.jar:/home/hduser/jars/hive-hbase-handler-2.1.0.jar
> spark.executor.extraClassPath
> /home/hduser/jars/ojdbc6.jar:/home/hduser/jars/jconn4.jar:/home/hduser/jars/hbase-client-1.2.3.jar:/home/hduser/jars/hbase-server-1.2.3.jar:/home/hduser/jars/hbase-common-1.2.3.jar:/home/hduser/jars/hbase-protocol-1.2.3.jar:/home/hduser/jars/htrace-core-3.0.4.jar:/home/hduser/jars/hive-hbase-handler-2.1.0.jar
> 
> 
> and also in Spark shell where I test the code
> 
>  --jars 
> /home/hduser/jars/hbase-client-1.2.3.jar,/home/hduser/jars/hbase-server-1.2.3.jar,/home/hduser/jars/hbase-common-1.2.3.jar,/home/hduser/jars/hbase-protocol-1.2.3.jar,/home/hduser/jars/htrace-core-3.0.4.jar,/home/hduser/jars/hive-hbase-handler-2.1.0.jar'
> 
> So any ideas will be appreciated.
> 
> 

Re: JDBC Very Slow

2016-09-16 Thread Benjamin Kim
I am testing this in spark-shell. I am following the Spark documentation by 
simply adding the PostgreSQL driver to the Spark Classpath.

SPARK_CLASSPATH=/path/to/postgresql/driver spark-shell

Then, I run the code below to connect to the PostgreSQL database to query. This 
is when I have problems.

Thanks,
Ben


> On Sep 16, 2016, at 3:29 PM, Nikolay Zhebet <phpap...@gmail.com> wrote:
> 
> Hi! Can you split init code with current comand? I thing it is main problem 
> in your code.
> 
> 16 сент. 2016 г. 8:26 PM пользователь "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> написал:
> Has anyone using Spark 1.6.2 encountered very slow responses from pulling 
> data from PostgreSQL using JDBC? I can get to the table and see the schema, 
> but when I do a show, it takes very long or keeps timing out.
> 
> The code is simple.
> 
> val jdbcDF = sqlContext.read.format("jdbc").options(
> Map("url" -> 
> "jdbc:postgresql://dbserver:port/database?user=user=password",
>"dbtable" -> “schema.table")).load()
> 
> jdbcDF.show
> 
> If anyone can help, please let me know.
> 
> Thanks,
> Ben
> 



JDBC Very Slow

2016-09-16 Thread Benjamin Kim
Has anyone using Spark 1.6.2 encountered very slow responses from pulling data 
from PostgreSQL using JDBC? I can get to the table and see the schema, but when 
I do a show, it takes very long or keeps timing out.

The code is simple.

val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> 
"jdbc:postgresql://dbserver:port/database?user=user=password",
   "dbtable" -> “schema.table")).load()

jdbcDF.show

If anyone can help, please let me know.

Thanks,
Ben



Re: Using Spark SQL to Create JDBC Tables

2016-09-13 Thread Benjamin Kim
Thank you for the idea. I will look for a PostgreSQL Serde for Hive. But, if 
you don’t mind me asking, how did you install the Oracle Serde?

Cheers,
Ben


> On Sep 13, 2016, at 7:12 PM, ayan guha <guha.a...@gmail.com> wrote:
> 
> One option is have Hive as the central point of exposing data ie create hive 
> tables which "point to" any other DB. i know Oracle provides there own Serde 
> for hive. Not sure about PG though.
> 
> Once tables are created in hive, STS will automatically see it. 
> 
> On Wed, Sep 14, 2016 at 11:08 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone created tables using Spark SQL that directly connect to a JDBC 
> data source such as PostgreSQL? I would like to use Spark SQL Thriftserver to 
> access and query remote PostgreSQL tables. In this way, we can centralize 
> data access to Spark SQL tables along with PostgreSQL making it very 
> convenient for users. They would not know or care where the data is 
> physically located anymore.
> 
> By the way, our users only know SQL.
> 
> If anyone has a better suggestion, then please let me know too.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Using Spark SQL to Create JDBC Tables

2016-09-13 Thread Benjamin Kim
Has anyone created tables using Spark SQL that directly connect to a JDBC data 
source such as PostgreSQL? I would like to use Spark SQL Thriftserver to access 
and query remote PostgreSQL tables. In this way, we can centralize data access 
to Spark SQL tables along with PostgreSQL making it very convenient for users. 
They would not know or care where the data is physically located anymore.

By the way, our users only know SQL.

If anyone has a better suggestion, then please let me know too.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL Thriftserver

2016-09-13 Thread Benjamin Kim
Mich,

It sounds like that there would be no harm in changing then. Are you saying 
that using STS would still use MapReduce to run the SQL statements? What our 
users are doing in our CDH 5.7.2 installation is changing the execution engine 
to Spark when connected to HiveServer2 to get faster results. Would they still 
have to do this using STS? Lastly, we are seeing zombie YARN jobs left behind 
even after a user disconnects. Are you seeing this happen with STS? If not, 
then this would be even better.

Thanks for your fast reply.

Cheers,
Ben

> On Sep 13, 2016, at 3:15 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> Hi,
> 
> Spark Thrift server (STS) still uses hive thrift server. If you look at 
> $SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)
> 
> function usage {
>   echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
>   pattern="usage"
>   pattern+="\|Spark assembly has been built with Hive"
>   pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
>   pattern+="\|Spark Command: "
>   pattern+="\|==="
>   pattern+="\|--help"
> 
> 
> Indeed when you start STS, you pass hiveconf parameter to it
> 
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master  \
> --hiveconf hive.server2.thrift.port=10055 \
> 
> and STS bypasses Spark optimiser and uses Hive optimizer and execution 
> engine. You will see this in hive.log file
> 
> So I don't think it is going to give you much difference. Unless they have 
> recently changed the design of STS.
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 13 September 2016 at 22:32, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Does anyone have any thoughts about using Spark SQL Thriftserver in Spark 
> 1.6.2 instead of HiveServer2? We are considering abandoning HiveServer2 for 
> it. Some advice and gotcha’s would be nice to know.
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Spark SQL Thriftserver

2016-09-13 Thread Benjamin Kim
Does anyone have any thoughts about using Spark SQL Thriftserver in Spark 1.6.2 
instead of HiveServer2? We are considering abandoning HiveServer2 for it. Some 
advice and gotcha’s would be nice to know.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Metrics: custom source/sink configurations not getting recognized

2016-09-07 Thread Benjamin Kim
We use Graphite/Grafana for custom metrics. We found Spark’s metrics not to be 
customizable. So, we write directly using Graphite’s API, which was very easy 
to do using Java’s socket library in Scala. It works great for us, and we are 
going one step further using Sensu to alert us if there is an anomaly in the 
metrics beyond the norm.

Hope this helps.

Cheers,
Ben


> On Sep 6, 2016, at 9:52 PM, map reduced  wrote:
> 
> Hi, anyone has any ideas please?
> 
> On Mon, Sep 5, 2016 at 8:30 PM, map reduced  > wrote:
> Hi,
> 
> I've written my custom metrics source/sink for my Spark streaming app and I 
> am trying to initialize it from metrics.properties - but that doesn't work 
> from executors. I don't have control on the machines in Spark cluster, so I 
> can't copy properties file in $SPARK_HOME/conf/ in the cluster. I have it in 
> the fat jar where my app lives, but by the time my fat jar is downloaded on 
> worker nodes in cluster, executors are already started and their Metrics 
> system is already initialized - thus not picking my file with custom source 
> configuration in it.
> 
> Following this post 
> ,
>  I've specified 'spark.files 
>  = 
> metrics.properties' and 'spark.metrics.conf=metrics.properties' but by the 
> time 'metrics.properties' is shipped to executors, their metric system is 
> already initialized.
> 
> If I initialize my own metrics system, it's picking up my file but then I'm 
> missing master/executor level metrics/properties (eg. 
> executor.sink.mySink.propName=myProp - can't read 'propName' from 'mySink') 
> since they are initialized 
> 
>  by Spark's metric system.
> 
> Is there a (programmatic) way to have 'metrics.properties' shipped before 
> executors initialize 
> 
>  ?
> 
> Here's my SO question 
> .
> 
> Thanks,
> 
> KP
> 
> 



Re: Spark SQL Tables on top of HBase Tables

2016-09-03 Thread Benjamin Kim
I’m using Spark 1.6 and HBase 1.2. Have you got it to work using these versions?

> On Sep 3, 2016, at 12:49 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> I am trying to find a solution for this
> 
> ERROR log: error in initSerDe: java.lang.ClassNotFoundException Class 
> org.apache.hadoop.hive.hbase.HBaseSerDe not found
> 
> I am using Spark 2 and Hive 2!
> 
> HTH
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 3 September 2016 at 20:31, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Mich,
> 
> I’m in the same boat. We can use Hive but not Spark.
> 
> Cheers,
> Ben
> 
>> On Sep 2, 2016, at 3:37 PM, Mich Talebzadeh <mich.talebza...@gmail.com 
>> <mailto:mich.talebza...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> You can create Hive external  tables on top of existing Hbase table using 
>> the property
>> 
>> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>> 
>> Example
>> 
>> hive> show create table hbase_table;
>> OK
>> CREATE TABLE `hbase_table`(
>>   `key` int COMMENT '',
>>   `value1` string COMMENT '',
>>   `value2` int COMMENT '',
>>   `value3` int COMMENT '')
>> ROW FORMAT SERDE
>>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
>> STORED BY
>>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>> WITH SERDEPROPERTIES (
>>   'hbase.columns.mapping'=':key,a:b,a:c,d:e',
>>   'serialization.format'='1')
>> TBLPROPERTIES (
>>   'transient_lastDdlTime'='1472370939')
>> 
>>  Then try to access this Hive table from Spark which is giving me grief at 
>> the moment :(
>> 
>> scala> HiveContext.sql("use test")
>> res9: org.apache.spark.sql.DataFrame = []
>> scala> val hbase_table= spark.table("hbase_table")
>> 16/09/02 23:31:07 ERROR log: error in initSerDe: 
>> java.lang.ClassNotFoundException Class 
>> org.apache.hadoop.hive.hbase.HBaseSerDe not found
>> 
>> HTH
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> On 2 September 2016 at 23:08, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com 
>> <mailto:mdkhajaasm...@gmail.com>> wrote:
>> Hi Kim,
>> 
>> I am also looking for same information. Just got the same requirement today.
>> 
>> Thanks,
>> Asmath
>> 
>> On Fri, Sep 2, 2016 at 4:46 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I was wondering if anyone has tried to create Spark SQL tables on top of 
>> HBase tables so that data in HBase can be accessed using Spark Thriftserver 
>> with SQL statements? This is similar what can be done using Hive.
>> 
>> Thanks,
>> Ben
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
>> 
>> 
> 
> 



Re: Spark SQL Tables on top of HBase Tables

2016-09-03 Thread Benjamin Kim
Mich,

I’m in the same boat. We can use Hive but not Spark.

Cheers,
Ben

> On Sep 2, 2016, at 3:37 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> Hi,
> 
> You can create Hive external  tables on top of existing Hbase table using the 
> property
> 
> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> 
> Example
> 
> hive> show create table hbase_table;
> OK
> CREATE TABLE `hbase_table`(
>   `key` int COMMENT '',
>   `value1` string COMMENT '',
>   `value2` int COMMENT '',
>   `value3` int COMMENT '')
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES (
>   'hbase.columns.mapping'=':key,a:b,a:c,d:e',
>   'serialization.format'='1')
> TBLPROPERTIES (
>   'transient_lastDdlTime'='1472370939')
> 
>  Then try to access this Hive table from Spark which is giving me grief at 
> the moment :(
> 
> scala> HiveContext.sql("use test")
> res9: org.apache.spark.sql.DataFrame = []
> scala> val hbase_table= spark.table("hbase_table")
> 16/09/02 23:31:07 ERROR log: error in initSerDe: 
> java.lang.ClassNotFoundException Class 
> org.apache.hadoop.hive.hbase.HBaseSerDe not found
> 
> HTH
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 2 September 2016 at 23:08, KhajaAsmath Mohammed <mdkhajaasm...@gmail.com 
> <mailto:mdkhajaasm...@gmail.com>> wrote:
> Hi Kim,
> 
> I am also looking for same information. Just got the same requirement today.
> 
> Thanks,
> Asmath
> 
> On Fri, Sep 2, 2016 at 4:46 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I was wondering if anyone has tried to create Spark SQL tables on top of 
> HBase tables so that data in HBase can be accessed using Spark Thriftserver 
> with SQL statements? This is similar what can be done using Hive.
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 
> 



Spark SQL Tables on top of HBase Tables

2016-09-02 Thread Benjamin Kim
I was wondering if anyone has tried to create Spark SQL tables on top of HBase 
tables so that data in HBase can be accessed using Spark Thriftserver with SQL 
statements? This is similar what can be done using Hive.

Thanks,
Ben


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 1.6 Streaming with Checkpointing

2016-08-26 Thread Benjamin Kim
I am trying to implement checkpointing in my streaming application but I am 
getting a not serializable error. Has anyone encountered this? I am deploying 
this job in YARN clustered mode.

Here is a snippet of the main parts of the code.

object S3EventIngestion {
//create and setup streaming context
def createContext(
batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: 
String, databaseName: String, tableName: String, partitionByColumnName: String
): StreamingContext = {

println("Creating new context")
val sparkConf = new SparkConf().setAppName("S3EventIngestion")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// Create the streaming context with batch interval
val ssc = new StreamingContext(sc, Seconds(batchInterval))

// Create a text file stream on an S3 bucket
val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")

csv.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
// process data
}
})

ssc.checkpoint(checkpointDirectory)
ssc
}

def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage: S3EventIngestion  

")
System.exit(1)
}

// Get streaming context from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpoint,
() => createContext(interval, checkpoint, bucket, database, table, 
partitionBy))

//start streaming context
context.start()
context.awaitTermination()
}
}

Can someone help please?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



HBase-Spark Module

2016-07-29 Thread Benjamin Kim
I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to connect HBase and Spark using Python?

2016-07-22 Thread Benjamin Kim
It is included in Cloudera’s CDH 5.8.

> On Jul 22, 2016, at 6:13 PM, Mail.com  wrote:
> 
> Hbase Spark module will be available with Hbase 2.0. Is that out yet?
> 
>> On Jul 22, 2016, at 8:50 PM, Def_Os  wrote:
>> 
>> So it appears it should be possible to use HBase's new hbase-spark module, if
>> you follow this pattern:
>> https://hbase.apache.org/book.html#_sparksql_dataframes
>> 
>> Unfortunately, when I run my example from PySpark, I get the following
>> exception:
>> 
>> 
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o120.save.
>>> : java.lang.RuntimeException: org.apache.hadoop.hbase.spark.DefaultSource
>>> does not allow create table as select.
>>>   at scala.sys.package$.error(package.scala:27)
>>>   at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:259)
>>>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
>>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>   at java.lang.reflect.Method.invoke(Method.java:606)
>>>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>>   at py4j.Gateway.invoke(Gateway.java:259)
>>>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>   at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>>   at java.lang.Thread.run(Thread.java:745)
>> 
>> Even when I created the table in HBase first, it still failed.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-connect-HBase-and-Spark-using-Python-tp27372p27397.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: transtition SQLContext to SparkSession

2016-07-18 Thread Benjamin Kim
From what I read, there is no more Contexts.

"SparkContext, SQLContext, HiveContext merged into SparkSession"

I have not tested it, but I don’t know if it’s true.

Cheers,
Ben


> On Jul 18, 2016, at 8:37 AM, Koert Kuipers  wrote:
> 
> in my codebase i would like to gradually transition to SparkSession, so while 
> i start using SparkSession i also want a SQLContext to be available as before 
> (but with a deprecated warning when i use it). this should be easy since 
> SQLContext is now a wrapper for SparkSession.
> 
> so basically:
> val session = SparkSession.builder.set(..., ...).getOrCreate()
> val sqlc = new SQLContext(session)
> 
> however this doesnt work, the SQLContext constructor i am trying to use is 
> private. SparkSession.sqlContext is also private.
> 
> am i missing something?
> 
> a non-gradual switch is not very realistic in any significant codebase, and i 
> do not want to create SparkSession and SQLContext independendly (both from 
> same SparkContext) since that can only lead to confusion and inconsistent 
> settings.



Re: Spark Website

2016-07-13 Thread Benjamin Kim
It takes me to the directories instead of the webpage.

> On Jul 13, 2016, at 11:45 AM, manish ranjan <cse1.man...@gmail.com> wrote:
> 
> working for me. What do you mean 'as supposed to'?
> 
> ~Manish
> 
> 
> 
> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone noticed that the spark.apache.org <http://spark.apache.org/> is 
> not working as supposed to?
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



Spark Website

2016-07-13 Thread Benjamin Kim
Has anyone noticed that the spark.apache.org is not working as supposed to?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SnappyData and Structured Streaming

2016-07-06 Thread Benjamin Kim
Jags,

Thanks for the details. This makes things much clearer. I saw in the Spark 
roadmap that version 2.1 will add the SQL capabilities mentioned here. It looks 
like, gradually, the Spark community is coming to the same conclusions that the 
SnappyData folks have come to a while back in terms of Streaming. But, there is 
always the need for a better way to store data underlying Spark. The State 
Store information was informative too. I can envision that it can use this data 
store too if need be.

Thanks again,
Ben

> On Jul 6, 2016, at 8:52 AM, Jags Ramnarayan <jramnara...@snappydata.io> wrote:
> 
> The plan is to fully integrate with the new structured streaming API and 
> implementation in an upcoming release. But, we will continue offering several 
> extensions. Few noted below ...
> 
> - the store (streaming sink) will offer a lot more capabilities like 
> transactions, replicated tables, partitioned row and column oriented tables 
> to suit different types of workloads. 
> - While streaming API(scala) in snappydata itself will change a bit to become 
> fully compatible with structured streaming(SchemaDStream will go away), we 
> will continue to offer SQL support for streams so they can be managed from 
> external clients (JDBC, ODBC), their partitions can share the same 
> partitioning strategy as the underlying table where it might be stored, and 
> even registrations of continuous queries from remote clients. 
> 
> While building streaming apps using the Spark APi offers tremendous 
> flexibility we also want to make it simple for apps to work with streams just 
> using SQL. For instance, you should be able to declaratively specify a table 
> as a sink to a stream(i.e. using SQL). For example, you can specify a "TopK 
> Table" (a built in special table for topK analytics using probabilistic data 
> structures) as a sink for a high velocity time series stream like this - 
> "create topK table MostPopularTweets on tweetStreamTable " +
> "options(key 'hashtag', frequencyCol 'retweets', timeSeriesColumn 
> 'tweetTime' )" 
> where 'tweetStreamTable' is created using the 'create stream table ...' SQL 
> syntax. 
> 
> 
> -
> Jags
> SnappyData blog <http://www.snappydata.io/blog>
> Download binary, source <https://github.com/SnappyDataInc/snappydata>
> 
> 
> On Wed, Jul 6, 2016 at 8:02 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Jags,
> 
> I should have been more specific. I am referring to what I read at 
> http://snappydatainc.github.io/snappydata/streamingWithSQL/ 
> <http://snappydatainc.github.io/snappydata/streamingWithSQL/>, especially the 
> Streaming Tables part. It roughly coincides with the Streaming DataFrames 
> outlined here 
> https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h
>  
> <https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h>.
>  I don’t if I’m wrong, but they both sound very similar. That’s why I posed 
> this question.
> 
> Thanks,
> Ben
> 
>> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan <jramnara...@snappydata.io 
>> <mailto:jramnara...@snappydata.io>> wrote:
>> 
>> Ben,
>>Note that Snappydata's primary objective is to be a distributed in-memory 
>> DB for mixed workloads (i.e. streaming with transactions and analytic 
>> queries). On the other hand, Spark, till date, is primarily designed as a 
>> processing engine over myriad storage engines (SnappyData being one). So, 
>> the marriage is quite complementary. The difference compared to other stores 
>> is that SnappyData realizes its solution by deeply integrating and 
>> collocating with Spark (i.e. share spark executor memory/resources with the 
>> store) avoiding serializations and shuffle in many situations.
>> 
>> On your specific thought about being similar to Structured streaming, a 
>> better discussion could be a comparison to the recently introduced State 
>> store 
>> <https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7>
>>  (perhaps this is what you meant). 
>> It proposes a KV store for streaming aggregations with support for updates. 
>> The proposed API will, at some point, be pluggable so vendors can easily 
>> support alternate implementations to storage, not just HDFS(default store in 
>> proposed State store). 
>> 
>> 
>> -
>> Jags
>> SnappyData blog <http://www.snappydata.io/blog>
>> Download binary, source <https://github.com/SnappyDataInc/snappydata>
&g

Re: SnappyData and Structured Streaming

2016-07-06 Thread Benjamin Kim
Jags,

I should have been more specific. I am referring to what I read at 
http://snappydatainc.github.io/snappydata/streamingWithSQL/, especially the 
Streaming Tables part. It roughly coincides with the Streaming DataFrames 
outlined here 
https://docs.google.com/document/d/1NHKdRSNCbCmJbinLmZuqNA1Pt6CGpFnLVRbzuDUcZVM/edit#heading=h.ff0opfdo6q1h.
 I don’t if I’m wrong, but they both sound very similar. That’s why I posed 
this question.

Thanks,
Ben

> On Jul 6, 2016, at 7:03 AM, Jags Ramnarayan <jramnara...@snappydata.io> wrote:
> 
> Ben,
>Note that Snappydata's primary objective is to be a distributed in-memory 
> DB for mixed workloads (i.e. streaming with transactions and analytic 
> queries). On the other hand, Spark, till date, is primarily designed as a 
> processing engine over myriad storage engines (SnappyData being one). So, the 
> marriage is quite complementary. The difference compared to other stores is 
> that SnappyData realizes its solution by deeply integrating and collocating 
> with Spark (i.e. share spark executor memory/resources with the store) 
> avoiding serializations and shuffle in many situations.
> 
> On your specific thought about being similar to Structured streaming, a 
> better discussion could be a comparison to the recently introduced State 
> store 
> <https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7>
>  (perhaps this is what you meant). 
> It proposes a KV store for streaming aggregations with support for updates. 
> The proposed API will, at some point, be pluggable so vendors can easily 
> support alternate implementations to storage, not just HDFS(default store in 
> proposed State store). 
> 
> 
> -
> Jags
> SnappyData blog <http://www.snappydata.io/blog>
> Download binary, source <https://github.com/SnappyDataInc/snappydata>
> 
> 
> On Wed, Jul 6, 2016 at 12:49 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I recently got a sales email from SnappyData, and after reading the 
> documentation about what they offer, it sounds very similar to what 
> Structured Streaming will offer w/o the underlying in-memory, spill-to-disk, 
> CRUD compliant data storage in SnappyData. I was wondering if Structured 
> Streaming is trying to achieve the same on its own or is SnappyData 
> contributing Streaming extensions that they built to the Spark project. 
> Lastly, what does the Spark community think of this so-called “Spark Data 
> Store”?
> 
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 



SnappyData and Structured Streaming

2016-07-05 Thread Benjamin Kim
I recently got a sales email from SnappyData, and after reading the 
documentation about what they offer, it sounds very similar to what Structured 
Streaming will offer w/o the underlying in-memory, spill-to-disk, CRUD 
compliant data storage in SnappyData. I was wondering if Structured Streaming 
is trying to achieve the same on its own or is SnappyData contributing 
Streaming extensions that they built to the Spark project. Lastly, what does 
the Spark community think of this so-called “Spark Data Store”?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kudu Connector

2016-06-29 Thread Benjamin Kim
I was wondering if anyone, who is a Spark Scala developer, would be willing to 
continue the work done for the Kudu connector?

https://github.com/apache/incubator-kudu/tree/master/java/kudu-spark/src/main/scala/org/kududb/spark/kudu

I have been testing and using Kudu for the past month and comparing against 
HBase. It seems like a promising data store to complement Spark. It fills the 
gap in our company as a fast updatable data store. We stream GB’s of data in 
and run analytical queries against it, which run in well below a minute 
typically. According to the Kudu users group, all it needs is to add SQL (JDBC) 
friendly features (CREATE TABLE, intuitive save modes (append = upsert and 
overwrite = truncate + insert), DELETE, etc.) and improve performance by 
implementing locality.

For reference, here is the page on contributing.

http://kudu.apache.org/docs/contributing.html

I am hoping that for individuals in the Spark community it would be relatively 
easy.

Thanks!
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Model Quality Tracking

2016-06-24 Thread Benjamin Kim
Has anyone implemented a way to track the performance of a data model? We 
currently have an algorithm to do record linkage and spit out statistics of 
matches, non-matches, and/or partial matches with reason codes of why we didn’t 
match accurately. In this way, we will know if something goes wrong down the 
line. All of this goes into a csv file directories partitioned by datetime with 
a hive table on top. Then, we can do analytical queries and even charting if 
need be. All of this is very manual, but I was wondering if there is a package, 
software, built-in module, etc. that would do this automatically. Since we are 
using CDH, it would be great if these graphs could be integrated into Cloudera 
Manager too.

Any advice is welcome.

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Data Integrity / Model Quality Monitoring

2016-06-17 Thread Benjamin Kim
Has anyone run into this requirement?

We have a need to track data integrity and model quality metrics of outcomes so 
that we can both gauge if the data is healthy coming in and the models run 
against them are still performing and not giving faulty results. A nice to have 
would be to graph these over time somehow. Since we are using Cloudera Manager, 
graphing in there would be a plus.

Any advice or suggestions would be welcome.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
e  string
> timestamp   string
> event_valid int
> event_subtype   string
> user_ip string
> user_id string
> cookie_status   string
> profile_status  string
> user_status string
> previous_timestamp  string
> user_agent  string
> referer string
> uri string
> request_elapsed bigint
> browser_languages   string
> acamp_idint
> creative_id int
> location_id int
> pcamp_idint
> pdomain_id  int
> country string
> region  string
> dma int
> citystring
> zip string
> isp string
> line_speed  string
> gender  string
> year_of_birth   int
> behaviors_read  string
> behaviors_written   string
> key_value_pairs string
> acamp_candidatesint
> tag_format  string
> optimizer_name  string
> optimizer_version   string
> optimizer_ipstring
> pixel_idint
> video_idstring
> video_network_idint
> video_time_watched  bigint
> video_percentage_watchedint
> conversion_valid_sale   int
> conversion_sale_amount  float
> conversion_commission_amountfloat
> conversion_step int
> conversion_currency string
> conversion_attribution  int
> conversion_offer_id string
> custom_info string
> frequency   int
> recency_seconds int
> costfloat
> revenue float
> optimizer_acamp_id  int
> optimizer_creative_id   int
> optimizer_ecpm  float
> event_idstring
> impression_id   string
> diagnostic_data string
> user_profile_mapping_source string
> latitudefloat
> longitude   float
> area_code   int
> gmt_offset  string
> in_dst  string
> proxy_type  string
> mobile_carrier  string
> pop string
> hostnamestring
> profile_ttl string
> timestamp_iso   string
> reference_idstring
> identity_organization   string
> identity_method string
> mappable_id string
> profile_expires string
> video_player_iframedint
> video_player_in_viewint
> video_player_width  int
> video_player_height int
> host_domain string
> browser_typestring
> browser_device_cat  string
> browser_family  string
> browser_namestring
> browser_version string
> browser_major_version   string
> browser_minor_version   string
> os_family   string
> os_name string
> os_version  string
> os_major_versionstring
> os_minor_versionstring
> # Partition Information
> # col_name  data_type   comment
> dt  timestamp
> # Detailed Table Information
> Database:   test
> Owner:  hduser
> CreateTime: Fri Jun 03 19:03:20 BST 2016
> LastAccessTime: UNKNOWN
> Retention:  0
> Location:   
> hdfs://rhes564:9000/user/hive/warehouse/test.db/amo_bi_events
> Table Type: EXTERNAL_TABLE
> Table Parameters:
> EXTERNALTRUE
> transient_lastDdlTime   1464977000
> # Storage Information
> SerDe Library:  
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
> InputFormat:
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
> OutputFormat:   
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
> Compressed: No
> Num Buckets:-1
> Bucket Columns: []
> Sort Columns:   []
> Storage Desc Params:
> serialization.format1
> Time taken: 0.397 seconds, Fetched: 124 row(s)
> 
> So effectively that table is partitioned by dt in notime
> 
> Now what I don't understand whether that table is already partitioned as you 
> said the table already exists!
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://t

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Mich,

I am using .withColumn to add another column “dt” that is a reformatted version 
of an existing column “timestamp”. The partitioned by column is “dt”.

We are using Spark 1.6.0 in CDH 5.7.0.

Thanks,
Ben

> On Jun 3, 2016, at 10:33 AM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> what version of spark are you using
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 3 June 2016 at 17:51, Mich Talebzadeh <mich.talebza...@gmail.com 
> <mailto:mich.talebza...@gmail.com>> wrote:
> ok what is the new column is called? you are basically adding a new column to 
> an already existing table
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 3 June 2016 at 17:04, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> The table already exists.
> 
>  CREATE EXTERNAL TABLE `amo_bi_events`(   
>`event_type` string COMMENT '',
>   
>`timestamp` string COMMENT '', 
>   
>`event_valid` int COMMENT '',  
>   
>`event_subtype` string COMMENT '', 
>   
>`user_ip` string COMMENT '',   
>   
>`user_id` string COMMENT '',   
>   
>`cookie_status` string COMMENT '', 
>   
>`profile_status` string COMMENT '',
>   
>`user_status` string COMMENT '',   
>   
>`previous_timestamp` string COMMENT '',
>   
>`user_agent` string COMMENT '',
>   
>`referer` string COMMENT '',   
>   
>`uri` string COMMENT '',   
>   
>`request_elapsed` bigint COMMENT '',   
>   
>`browser_languages` string COMMENT '', 
>   
>`acamp_id` int COMMENT '', 
>   
>`creative_id` int COMMENT '',  
>   
>`location_id` int COMMENT '',  
>   
>`pcamp_id` int COMMENT '', 
>   
>`pdomain_id` int COMMENT '',   
>   
>`country` string COMMENT '',   
>   
>`region` string COMMENT '',
>   
>`dma` int COMMENT '',  
>   
>`city` string COMMENT '',  
>   
>`zip` string COMMENT '',   
>   
>`isp` string COMMENT '',   
>   
>`line_speed` string COMMENT '',
>   
>`gender` string COMMENT '',
>   
>`year_of_birth` int COMMENT '',
>   
>`behaviors_read` string COMMENT '',
>   
>`behaviors_written` string COMMENT '', 
>   
>`key_value_pairs` string COMMENT '',   
>   
>`acamp_candidates` int COMMENT '', 
>   
>`tag_format` string COMMENT '',
>   
>`optimizer_name` string COMMENT '',
>   
>`optimizer_version` string COMMENT '', 
>   
>

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
 '',   

   `event_id` string COMMENT '',

   `impression_id` string COMMENT '',   

   `diagnostic_data` string COMMENT '', 

   `user_profile_mapping_source` string COMMENT '', 

   `latitude` float COMMENT '', 

   `longitude` float COMMENT '',

   `area_code` int COMMENT '',  

   `gmt_offset` string COMMENT '',  

   `in_dst` string COMMENT '',  

   `proxy_type` string COMMENT '',  

   `mobile_carrier` string COMMENT '',  

   `pop` string COMMENT '', 

   `hostname` string COMMENT '',

   `profile_ttl` string COMMENT '', 

   `timestamp_iso` string COMMENT '',   

   `reference_id` string COMMENT '',

   `identity_organization` string COMMENT '',   

   `identity_method` string COMMENT '', 

   `mappable_id` string COMMENT '', 

   `profile_expires` string COMMENT '', 

   `video_player_iframed` int COMMENT '',   

   `video_player_in_view` int COMMENT '',   

   `video_player_width` int COMMENT '', 

   `video_player_height` int COMMENT '',

   `host_domain` string COMMENT '', 

   `browser_type` string COMMENT '',

   `browser_device_cat` string COMMENT '',  

   `browser_family` string COMMENT '',  

   `browser_name` string COMMENT '',

   `browser_version` string COMMENT '', 

   `browser_major_version` string COMMENT '',   

   `browser_minor_version` string COMMENT '',   

   `os_family` string COMMENT '',   

   `os_name` string COMMENT '', 

   `os_version` string COMMENT '',  

   `os_major_version` string COMMENT '',

   `os_minor_version` string COMMENT '')

 PARTITIONED BY (`dt` timestamp)
 
 STORED AS PARQUET;

Thanks,
Ben


> On Jun 3, 2016, at 8:47 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> hang on are you saving this as a new table?
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 3 June 2016 at 14:13, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Does anyone know how to save data in a DataFrame to a table partitioned using 
> an existing column reformatted into a derived column?
> 
> val partitionedDf = df.withColumn("dt", 
> concat(substring($"timestamp", 1, 10), lit(" "), substring($"timestamp", 12, 
> 2), lit(":00")))
> 
> sqlContext.setConf("hive.exec.dynamic.partition", "true")
> sqlContext.setConf("hive.exec.dynamic.partition.mode", 
> "nonstrict")
> partitionedDf.write
> .mode(SaveMode.Append)
> .partitionBy("dt")
> .saveAsTable("ds.amo_bi_events")
> 
> I am getting an ArrayOutOfBounds error. There are 83 columns in the 
> destination table. But after adding the derived column, then I get an 84 
> error. I assumed that the column used for the partition would not be counted.
> 
> Can someone ple

Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Does anyone know how to save data in a DataFrame to a table partitioned using 
an existing column reformatted into a derived column?

val partitionedDf = df.withColumn("dt", 
concat(substring($"timestamp", 1, 10), lit(" "), substring($"timestamp", 12, 
2), lit(":00")))

sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
partitionedDf.write
.mode(SaveMode.Append)
.partitionBy("dt")
.saveAsTable("ds.amo_bi_events")

I am getting an ArrayOutOfBounds error. There are 83 columns in the destination 
table. But after adding the derived column, then I get an 84 error. I assumed 
that the column used for the partition would not be counted.

Can someone please help.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming S3 Error

2016-05-21 Thread Benjamin Kim
I got my answer.

The way to access S3 has changed.

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", accessKey)
hadoopConf.set("fs.s3a.secret.key", secretKey)

val lines = ssc.textFileStream("s3a://amg-events-out/")

This worked.

Cheers,
Ben


> On May 21, 2016, at 4:18 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Maybe more than one version of jets3t-xx.jar was on the classpath.
> 
> FYI
> 
> On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
> Spark 1.6.0. It seems not to work. I keep getting this error.
> 
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
> stack
> Exception Details:
>   Location:
> 
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
>  @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
> assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
> integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org 
> <http://org.apache.spark.streaming.dstream.fileinputdstream.org/>$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>

Re: Spark Streaming S3 Error

2016-05-21 Thread Benjamin Kim
Ted,

I only see 1 jets3t-0.9.0 jar in the classpath after running this to list the 
jars.

val cl = ClassLoader.getSystemClassLoader
cl.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(println)

/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/jets3t-0.9.0.jar

I don’t know what else could be wrong.

Thanks,
Ben

> On May 21, 2016, at 4:18 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Maybe more than one version of jets3t-xx.jar was on the classpath.
> 
> FYI
> 
> On Fri, May 20, 2016 at 8:31 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
> Spark 1.6.0. It seems not to work. I keep getting this error.
> 
> Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
> stack
> Exception Details:
>   Location:
> 
> org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
>  @155: invokevirtual
>   Reason:
> Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
> assignable to 'org/jets3t/service/model/StorageObject'
>   Current Frame:
> bci: @155
> flags: { }
> locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
> stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
> 'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
> integer }
>   Bytecode:
> 0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
> 0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
> 0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
> 0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
> 0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
> 0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
> 0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
> 0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
> 0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
> 0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
> 0x0a0: 000a 4e2a 2d2b b700 c7b1
>   Exception Handler Table:
> bci [0, 116] => handler: 162
> bci [117, 159] => handler: 162
>   Stackmap Table:
> same_frame_extended(@65)
> same_frame(@117)
> same_locals_1_stack_item_frame(@162,Object[#139])
> same_frame(@169)
> 
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.streaming.dstream.FileInputDStream.org 
> <http://org.apache.spark.streaming.dstream.fileinputdstream.org/>$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at scala.Option.orElse(Option.scala:257)
> at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at 
> org.apache.spark.

Spark Streaming S3 Error

2016-05-20 Thread Benjamin Kim
I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
Spark 1.6.0. It seems not to work. I keep getting this error.

Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
stack
Exception Details:
  Location:

org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
 @155: invokevirtual
  Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
assignable to 'org/jets3t/service/model/StorageObject'
  Current Frame:
bci: @155
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
integer }
  Bytecode:
0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
0x0a0: 000a 4e2a 2d2b b700 c7b1   
  Exception Handler Table:
bci [0, 116] => handler: 162
bci [117, 159] => handler: 162
  Stackmap Table:
same_frame_extended(@65)
same_frame(@117)
same_locals_1_stack_item_frame(@162,Object[#139])
same_frame(@169)

at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at 

Re: Structured Streaming in Spark 2.0 and DStreams

2016-05-16 Thread Benjamin Kim
I have a curiosity question. These forever/unlimited DataFrames/DataSets will 
persist and be query capable. I still am foggy about how this data will be 
stored. As far as I know, memory is finite. Will the data be spilled to disk 
and be retrievable if the query spans data not in memory? Is Tachyon (Alluxio), 
HDFS (Parquet), NoSQL (HBase, Cassandra), RDBMS (PostgreSQL, MySQL), Object 
Store (S3, Swift), or any else I can’t think of going to be the underlying near 
real-time storage system?

Thanks,
Ben

> On May 15, 2016, at 3:36 PM, Yuval Itzchakov  wrote:
> 
> Hi Ofir,
> Thanks for the elaborated answer. I have read both documents, where they do a 
> light touch on infinite Dataframes/Datasets. However, they do not go in depth 
> as regards to how existing transformations on DStreams, for example, will be 
> transformed into the Dataset APIs. I've been browsing the 2.0 branch and have 
> yet been able to understand how they correlate.
> 
> Also, placing SparkSession in the sql package seems like a peculiar choice, 
> since this is going to be the global abstraction over 
> SparkContext/StreamingContext from now on.
> 
> On Sun, May 15, 2016, 23:42 Ofir Manor  > wrote:
> Hi Yuval,
> let me share my understanding based on similar questions I had.
> First, Spark 2.x aims to replace a whole bunch of its APIs with just two main 
> ones - SparkSession (replacing Hive/SQL/Spark Context) and Dataset (merging 
> of Dataset and Dataframe - which is why it inherits all the SparkSQL 
> goodness), while RDD seems as a low-level API only for special cases. The new 
> Dataset should also support both batch and streaming - replacing (eventually) 
> DStream as well. See the design docs in SPARK-13485 (unified API) and 
> SPARK-8360 (StructuredStreaming) for a good intro. 
> However, as you noted, not all will be fully delivered in 2.0. For example, 
> it seems that streaming from / to Kafka using StructuredStreaming didn't make 
> it (so far?) to 2.0 (which is a showstopper for me). 
> Anyway, as far as I understand, you should be able to apply stateful 
> operators (non-RDD) on Datasets (for example, the new event-time window 
> processing SPARK-8360). The gap I see is mostly limited streaming sources / 
> sinks migrated to the new (richer) API and semantics.
> Anyway, I'm pretty sure once 2.0 gets to RC, the documentation and examples 
> will align with the current offering...
> 
> 
> Ofir Manor
> 
> Co-Founder & CTO | Equalum
> 
> 
> Mobile: +972-54-7801286  | Email: 
> ofir.ma...@equalum.io 
> On Sun, May 15, 2016 at 1:52 PM, Yuval.Itzchakov  > wrote:
> I've been reading/watching videos about the upcoming Spark 2.0 release which
> brings us Structured Streaming. One thing I've yet to understand is how this
> relates to the current state of working with Streaming in Spark with the
> DStream abstraction.
> 
> All examples I can find, in the Spark repository/different videos is someone
> streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
> the source, SparkSession seems to be defined inside org.apache.spark.sql, so
> this gives me a hunch that this is somehow all related to SQL and the likes,
> and not really to DStreams.
> 
> What I'm failing to understand is: Will this feature impact how we do
> Streaming today? Will I be able to consume a Kafka source in a streaming
> fashion (like we do today when we open a stream using KafkaUtils)? Will we
> be able to do state-full operations on a Dataset[T] like we do today using
> MapWithStateRDD? Or will there be a subset of operations that the catalyst
> optimizer can understand such as aggregate and such?
> 
> I'd be happy anyone could shed some light on this.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-in-Spark-2-0-and-DStreams-tp26959.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



Re: Structured Streaming in Spark 2.0 and DStreams

2016-05-15 Thread Benjamin Kim
Ofir,

Thanks for the clarification. I was confused for the moment. The links will be 
very helpful.


> On May 15, 2016, at 2:32 PM, Ofir Manor <ofir.ma...@equalum.io> wrote:
> 
> Ben,
> I'm just a Spark user - but at least in March Spark Summit, that was the main 
> term used.
> Taking a step back from the details, maybe this new post from Reynold is a 
> better intro to Spark 2.0 highlights 
> https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html
>  
> <https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html>
> 
> If you want to drill down, go to SPARK-8360 "Structured Streaming (aka 
> Streaming DataFrames)". The design doc (written by Reynold in March) is very 
> readable:
>  https://issues.apache.org/jira/browse/SPARK-8360 
> <https://issues.apache.org/jira/browse/SPARK-8360>
> 
> Regarding directly querying (SQL) the state managed by a streaming process - 
> I don't know if that will land in 2.0 or only later.
> 
> Hope that helps,
> 
> Ofir Manor
> 
> Co-Founder & CTO | Equalum
> 
> 
> Mobile: +972-54-7801286 <tel:%2B972-54-7801286> | Email: 
> ofir.ma...@equalum.io <mailto:ofir.ma...@equalum.io>
> On Sun, May 15, 2016 at 11:58 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Ofir,
> 
> I just recently saw the webinar with Reynold Xin. He mentioned the Spark 
> Session unification efforts, but I don’t remember the DataSet for Structured 
> Streaming aka Continuous Applications as he put it. He did mention streaming 
> or unlimited DataFrames for Structured Streaming so one can directly query 
> the data from it. Has something changed since then?
> 
> Thanks,
> Ben
> 
> 
>> On May 15, 2016, at 1:42 PM, Ofir Manor <ofir.ma...@equalum.io 
>> <mailto:ofir.ma...@equalum.io>> wrote:
>> 
>> Hi Yuval,
>> let me share my understanding based on similar questions I had.
>> First, Spark 2.x aims to replace a whole bunch of its APIs with just two 
>> main ones - SparkSession (replacing Hive/SQL/Spark Context) and Dataset 
>> (merging of Dataset and Dataframe - which is why it inherits all the 
>> SparkSQL goodness), while RDD seems as a low-level API only for special 
>> cases. The new Dataset should also support both batch and streaming - 
>> replacing (eventually) DStream as well. See the design docs in SPARK-13485 
>> (unified API) and SPARK-8360 (StructuredStreaming) for a good intro. 
>> However, as you noted, not all will be fully delivered in 2.0. For example, 
>> it seems that streaming from / to Kafka using StructuredStreaming didn't 
>> make it (so far?) to 2.0 (which is a showstopper for me). 
>> Anyway, as far as I understand, you should be able to apply stateful 
>> operators (non-RDD) on Datasets (for example, the new event-time window 
>> processing SPARK-8360). The gap I see is mostly limited streaming sources / 
>> sinks migrated to the new (richer) API and semantics.
>> Anyway, I'm pretty sure once 2.0 gets to RC, the documentation and examples 
>> will align with the current offering...
>> 
>> 
>> Ofir Manor
>> 
>> Co-Founder & CTO | Equalum
>> 
>> 
>> Mobile: +972-54-7801286 <tel:%2B972-54-7801286> | Email: 
>> ofir.ma...@equalum.io <mailto:ofir.ma...@equalum.io>
>> On Sun, May 15, 2016 at 1:52 PM, Yuval.Itzchakov <yuva...@gmail.com 
>> <mailto:yuva...@gmail.com>> wrote:
>> I've been reading/watching videos about the upcoming Spark 2.0 release which
>> brings us Structured Streaming. One thing I've yet to understand is how this
>> relates to the current state of working with Streaming in Spark with the
>> DStream abstraction.
>> 
>> All examples I can find, in the Spark repository/different videos is someone
>> streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
>> the source, SparkSession seems to be defined inside org.apache.spark.sql, so
>> this gives me a hunch that this is somehow all related to SQL and the likes,
>> and not really to DStreams.
>> 
>> What I'm failing to understand is: Will this feature impact how we do
>> Streaming today? Will I be able to consume a Kafka source in a streaming
>> fashion (like we do today when we open a stream using KafkaUtils)? Will we
>> be able to do state-full operations on a Dataset[T] like we do today using
>> MapWithStateRDD? Or will there be a subset of operations that the catalyst
>> optimizer can understand such as aggregate and such?
>> 
>> I'd be happy anyone could s

Re: Structured Streaming in Spark 2.0 and DStreams

2016-05-15 Thread Benjamin Kim
Hi Ofir,

I just recently saw the webinar with Reynold Xin. He mentioned the Spark 
Session unification efforts, but I don’t remember the DataSet for Structured 
Streaming aka Continuous Applications as he put it. He did mention streaming or 
unlimited DataFrames for Structured Streaming so one can directly query the 
data from it. Has something changed since then?

Thanks,
Ben


> On May 15, 2016, at 1:42 PM, Ofir Manor  wrote:
> 
> Hi Yuval,
> let me share my understanding based on similar questions I had.
> First, Spark 2.x aims to replace a whole bunch of its APIs with just two main 
> ones - SparkSession (replacing Hive/SQL/Spark Context) and Dataset (merging 
> of Dataset and Dataframe - which is why it inherits all the SparkSQL 
> goodness), while RDD seems as a low-level API only for special cases. The new 
> Dataset should also support both batch and streaming - replacing (eventually) 
> DStream as well. See the design docs in SPARK-13485 (unified API) and 
> SPARK-8360 (StructuredStreaming) for a good intro. 
> However, as you noted, not all will be fully delivered in 2.0. For example, 
> it seems that streaming from / to Kafka using StructuredStreaming didn't make 
> it (so far?) to 2.0 (which is a showstopper for me). 
> Anyway, as far as I understand, you should be able to apply stateful 
> operators (non-RDD) on Datasets (for example, the new event-time window 
> processing SPARK-8360). The gap I see is mostly limited streaming sources / 
> sinks migrated to the new (richer) API and semantics.
> Anyway, I'm pretty sure once 2.0 gets to RC, the documentation and examples 
> will align with the current offering...
> 
> 
> Ofir Manor
> 
> Co-Founder & CTO | Equalum
> 
> 
> Mobile: +972-54-7801286  | Email: 
> ofir.ma...@equalum.io 
> On Sun, May 15, 2016 at 1:52 PM, Yuval.Itzchakov  > wrote:
> I've been reading/watching videos about the upcoming Spark 2.0 release which
> brings us Structured Streaming. One thing I've yet to understand is how this
> relates to the current state of working with Streaming in Spark with the
> DStream abstraction.
> 
> All examples I can find, in the Spark repository/different videos is someone
> streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
> the source, SparkSession seems to be defined inside org.apache.spark.sql, so
> this gives me a hunch that this is somehow all related to SQL and the likes,
> and not really to DStreams.
> 
> What I'm failing to understand is: Will this feature impact how we do
> Streaming today? Will I be able to consume a Kafka source in a streaming
> fashion (like we do today when we open a stream using KafkaUtils)? Will we
> be able to do state-full operations on a Dataset[T] like we do today using
> MapWithStateRDD? Or will there be a subset of operations that the catalyst
> optimizer can understand such as aggregate and such?
> 
> I'd be happy anyone could shed some light on this.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-in-Spark-2-0-and-DStreams-tp26959.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Save DataFrame to HBase

2016-05-10 Thread Benjamin Kim
Ted,

Will the hbase-spark module allow for creating tables in Spark SQL that 
reference the hbase tables underneath? In this way, users can query using just 
SQL.

Thanks,
Ben

> On Apr 28, 2016, at 3:09 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Hbase 2.0 release likely would come after Spark 2.0 release. 
> 
> There're other features being developed in hbase 2.0
> I am not sure when hbase 2.0 would be released. 
> 
> The refguide is incomplete. 
> Zhan has assigned the doc JIRA to himself. The documentation would be done 
> after fixing bugs in hbase-spark module. 
> 
> Cheers
> 
> On Apr 27, 2016, at 10:31 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Hi Ted,
>> 
>> Do you know when the release will be? I also see some documentation for 
>> usage of the hbase-spark module at the hbase website. But, I don’t see an 
>> example on how to save data. There is only one for reading/querying data. 
>> Will this be added when the final version does get released?
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 21, 2016, at 6:56 AM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can 
>>> do this.
>>> 
>>> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> Has anyone found an easy way to save a DataFrame into HBase?
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> <mailto:user-h...@spark.apache.org>
>>> 
>>> 
>> 



Re: Spark 2.0 Release Date

2016-04-28 Thread Benjamin Kim
Next Thursday is Databricks' webinar on Spark 2.0. If you are attending, I bet 
many are going to ask when the release will be. Last time they did this, Spark 
1.6 came out not too long afterward.

> On Apr 28, 2016, at 5:21 AM, Sean Owen  wrote:
> 
> I don't know if anyone has begun a firm discussion on dates, but there
> are >100 open issues and ~10 blockers, so still some work to do before
> code freeze, it looks like. My unofficial guess is mid June before
> it's all done.
> 
> On Thu, Apr 28, 2016 at 12:43 PM, Arun Patel  wrote:
>> A small request.
>> 
>> Would you mind providing an approximate date of Spark 2.0 release?  Is it
>> early May or Mid May or End of May?
>> 
>> Thanks,
>> Arun
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 2.0+ Structured Streaming

2016-04-28 Thread Benjamin Kim
Can someone explain to me how the new Structured Streaming works in the 
upcoming Spark 2.0+? I’m a little hazy how data will be stored and referenced 
if it can be queried and/or batch processed directly from streams and if the 
data will be append only to or will there be some sort of upsert capability 
available. This almost sounds similar to what AWS Kinesis is trying to achieve, 
but it can only store the data for 24 hours. Am I close?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save DataFrame to HBase

2016-04-27 Thread Benjamin Kim
Hi Ted,

Do you know when the release will be? I also see some documentation for usage 
of the hbase-spark module at the hbase website. But, I don’t see an example on 
how to save data. There is only one for reading/querying data. Will this be 
added when the final version does get released?

Thanks,
Ben

> On Apr 21, 2016, at 6:56 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
> this.
> 
> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone found an easy way to save a DataFrame into HBase?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Save DataFrame to HBase

2016-04-27 Thread Benjamin Kim
Daniel,

If you can get the code snippet, that would be great! I’ve been trying to get 
it to work for me as well. The examples on the Phoenix website do not work for 
me. If you are willing to also, can you include your setup to make Phoenix work 
with Spark?

Thanks,
Ben

> On Apr 27, 2016, at 11:46 AM, Paras sachdeva <paras.sachdeva11...@gmail.com> 
> wrote:
> 
> Hi Daniel,
> 
> Would you possibly be able to share the snipped to code you have used ?
> 
> Thank you.
> 
> On Wed, Apr 27, 2016 at 3:13 PM, Daniel Haviv 
> <daniel.ha...@veracity-group.com <mailto:daniel.ha...@veracity-group.com>> 
> wrote:
> Hi Benjamin,
> Yes it should work.
> 
> Let me know if you need further assistance I might be able to get the code 
> I've used for that project.
> 
> Thank you.
> Daniel
> 
> On 24 Apr 2016, at 17:35, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Hi Daniel,
>> 
>> How did you get the Phoenix plugin to work? I have CDH 5.5.2 installed which 
>> comes with HBase 1.0.0 and Phoenix 4.5.2. Do you think this will work?
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 24, 2016, at 1:43 AM, Daniel Haviv <daniel.ha...@veracity-group.com 
>>> <mailto:daniel.ha...@veracity-group.com>> wrote:
>>> 
>>> Hi,
>>> I tried saving DF to HBase using a hive table with hbase storage handler 
>>> and hiveContext but it failed due to a bug.
>>> 
>>> I was able to persist the DF to hbase using Apache Pheonix which was pretty 
>>> simple.
>>> 
>>> Thank you.
>>> Daniel
>>> 
>>> On 21 Apr 2016, at 16:52, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>>> Has anyone found an easy way to save a DataFrame into HBase?
>>>> 
>>>> Thanks,
>>>> Ben
>>>> 
>>>> 
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>> <mailto:user-h...@spark.apache.org>
>>>> 
>> 
> 



Convert DataFrame to Array of Arrays

2016-04-24 Thread Benjamin Kim
I have data in a DataFrame loaded from a CSV file. I need to load this data 
into HBase using an RDD formatted in a certain way.

val rdd = sc.parallelize(
Array(key1,
(ColumnFamily, ColumnName1, Value1),
(ColumnFamily, ColumnName2, Value2),
(…),
key2,
(ColumnFamily, ColumnName1, Value1),
(ColumnFamily, ColumnName2, Value2),
(…),
…)
)

Can someone help me to iterate through each column in a row of data to build 
such an Array structure?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save DataFrame to HBase

2016-04-24 Thread Benjamin Kim
Hi Daniel,

How did you get the Phoenix plugin to work? I have CDH 5.5.2 installed which 
comes with HBase 1.0.0 and Phoenix 4.5.2. Do you think this will work?

Thanks,
Ben

> On Apr 24, 2016, at 1:43 AM, Daniel Haviv <daniel.ha...@veracity-group.com> 
> wrote:
> 
> Hi,
> I tried saving DF to HBase using a hive table with hbase storage handler and 
> hiveContext but it failed due to a bug.
> 
> I was able to persist the DF to hbase using Apache Pheonix which was pretty 
> simple.
> 
> Thank you.
> Daniel
> 
> On 21 Apr 2016, at 16:52, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Has anyone found an easy way to save a DataFrame into HBase?
>> 
>> Thanks,
>> Ben
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 



Re: Save DataFrame to HBase

2016-04-21 Thread Benjamin Kim
Hi Ted,

Can this module be used with an older version of HBase, such as 1.0 or 1.1? 
Where can I get the module from?

Thanks,
Ben

> On Apr 21, 2016, at 6:56 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
> this.
> 
> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone found an easy way to save a DataFrame into HBase?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Save DataFrame to HBase

2016-04-21 Thread Benjamin Kim
Has anyone found an easy way to save a DataFrame into HBase?

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HBase Spark Module

2016-04-20 Thread Benjamin Kim
I see that the new CDH 5.7 has been release with the HBase Spark module 
built-in. I was wondering if I could just download it and use the hbase-spark 
jar file for CDH 5.5. Has anyone tried this yet?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JSON Usage

2016-04-17 Thread Benjamin Kim
Hyukjin,

This is what I did so far. I didn’t use DataSet yet or maybe I don’t need to.

var df: DataFrame = null
for(message <- messages) {
val bodyRdd = sc.parallelize(message.getBody() :: Nil)
val fileDf = sqlContext.read.json(bodyRdd)
.select(
$"Records.s3.bucket.name".as("bucket"),
$"Records.s3.object.key".as("key")
)
if (df != null) {
  df = df.unionAll(fileDf)
} else {
  df = fileDf
}
}
df.show

Each result is returned as an array. I just need to concatenate them together 
to make the S3 URL, and download the files per URL. This I need help with next.

Thanks,
Ben

> On Apr 17, 2016, at 7:38 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi!
> 
> Personally, I don't think it necessarily needs to be DataSet for your goal.
> 
> Just select your data at "s3" from DataFrame loaded by sqlContext.read.json().
> 
> You can try to printSchema() to check the nested schema and then select the 
> data.
> 
> Also, I guess (from your codes) you are trying to send a reauest, fetch the 
> response to driver-side, and then send each message to executor-side. I guess 
> there would be really heavy overhead in driver-side.
> Holden,
> 
> If I were to use DataSets, then I would essentially do this:
> 
> val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
> val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
> for (message <- messages.asScala) {
> val files = sqlContext.read.json(message.getBody())
> }
> 
> Can I simply do files.toDS() or do I have to create a schema using a case 
> class File and apply it as[File]? If I have to apply a schema, then how would 
> I create it based on the JSON structure below, especially the nested elements.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 14, 2016, at 3:46 PM, Holden Karau <hol...@pigscanfly.ca 
>> <mailto:hol...@pigscanfly.ca>> wrote:
>> 
>> You could certainly use RDDs for that, you might also find using Dataset 
>> selecting the fields you need to construct the URL to fetch and then using 
>> the map function to be easier.
>> 
>> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I was wonder what would be the best way to use JSON in Spark/Scala. I need 
>> to lookup values of fields in a collection of records to form a URL and 
>> download that file at that location. I was thinking an RDD would be perfect 
>> for this. I just want to hear from others who might have more experience in 
>> this. Below is the actual JSON structure that I am trying to use for the S3 
>> bucket and key values of each “record" within “Records".
>> 
>> {
>>"Records":[
>>   {
>>  "eventVersion":"2.0",
>>  "eventSource":"aws:s3",
>>  "awsRegion":"us-east-1",
>>  "eventTime":The time, in ISO-8601 format, for example, 
>> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>>  "eventName":"event-type",
>>  "userIdentity":{
>> 
>> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>>  },
>>  "requestParameters":{
>> "sourceIPAddress":"ip-address-where-request-came-from"
>>  },
>>  "responseElements":{
>> "x-amz-request-id":"Amazon S3 generated request ID",
>> "x-amz-id-2":"Amazon S3 host that processed the request"
>>  },
>>  "s3":{
>> "s3SchemaVersion":"1.0",
>> "configurationId":"ID found in the bucket notification 
>> configuration",
>> "bucket":{
>>"name":"bucket-name",
>>"ownerIdentity":{
>>   "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>>},
>>"arn":"bucket-ARN"
>> },
>> "object":{
>>"key":"object-key",
>>"size":object-size,
>>"eTag":"object eTag",
>>"versionId":"object version if bucket is versioning-enabled, 
>> otherwise null",
>>"sequencer": "a string representation of a hexadecimal value 
>> used to determine event sequence,
>>only used with PUTs and DELETEs"
>> }
>>  }
>>   },
>>   {
>>   // Additional events
>>   }
>>]
>> }
>> 
>> Thanks
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
>> 
>> 
>> -- 
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>



Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Thanks!

I got this to work.

val csvRdd = sc.parallelize(data.split("\n"))
val df = new 
com.databricks.spark.csv.CsvParser().withUseHeader(true).withInferSchema(true).csvRdd(sqlContext,
 csvRdd)

> On Apr 15, 2016, at 1:14 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi,
> 
> Would you try this codes below?
> 
> val csvRDD = ...your processimg for csv rdd..
> val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)
> 
> Thanks!
> 
> On 16 Apr 2016 1:35 a.m., "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Hyukjin,
> 
> I saw that. I don’t know how to use it. I’m still learning Scala on my own. 
> Can you help me to start?
> 
> Thanks,
> Ben
> 
>> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon <gurwls...@gmail.com 
>> <mailto:gurwls...@gmail.com>> wrote:
>> 
>> I hope it was not too late :).
>> 
>> It is possible.
>> 
>> Please check csvRdd api here, 
>> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>>  
>> <https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150>.
>> 
>> Thanks!
>> 
>> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
>> data strings. Each string representing the header row and multiple rows of 
>> data along with delimiters. I would like to feed each thru a CSV parser to 
>> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
>> with this data.
>> 
>> Please let me know if you have any ideas.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
> 



Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Is this right?


import com.databricks.spark.csv

val csvRdd = data.flatMap(x => x.split("\n"))
val df = new CsvParser().csvRdd(sqlContext, csvRdd, useHeader = true)

Thanks,
Ben


> On Apr 15, 2016, at 1:14 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> Hi,
> 
> Would you try this codes below?
> 
> val csvRDD = ...your processimg for csv rdd..
> val df = new CsvParser().csvRdd(sqlContext, csvRDD, useHeader = true)
> 
> Thanks!
> 
> On 16 Apr 2016 1:35 a.m., "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Hyukjin,
> 
> I saw that. I don’t know how to use it. I’m still learning Scala on my own. 
> Can you help me to start?
> 
> Thanks,
> Ben
> 
>> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon <gurwls...@gmail.com 
>> <mailto:gurwls...@gmail.com>> wrote:
>> 
>> I hope it was not too late :).
>> 
>> It is possible.
>> 
>> Please check csvRdd api here, 
>> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>>  
>> <https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150>.
>> 
>> Thanks!
>> 
>> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
>> data strings. Each string representing the header row and multiple rows of 
>> data along with delimiters. I would like to feed each thru a CSV parser to 
>> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
>> with this data.
>> 
>> Please let me know if you have any ideas.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
> 



Re: JSON Usage

2016-04-15 Thread Benjamin Kim
Holden,

If I were to use DataSets, then I would essentially do this:

val receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl)
val messages = sqs.receiveMessage(receiveMessageRequest).getMessages()
for (message <- messages.asScala) {
val files = sqlContext.read.json(message.getBody())
}

Can I simply do files.toDS() or do I have to create a schema using a case class 
File and apply it as[File]? If I have to apply a schema, then how would I 
create it based on the JSON structure below, especially the nested elements.

Thanks,
Ben


> On Apr 14, 2016, at 3:46 PM, Holden Karau <hol...@pigscanfly.ca> wrote:
> 
> You could certainly use RDDs for that, you might also find using Dataset 
> selecting the fields you need to construct the URL to fetch and then using 
> the map function to be easier.
> 
> On Thu, Apr 14, 2016 at 12:01 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I was wonder what would be the best way to use JSON in Spark/Scala. I need to 
> lookup values of fields in a collection of records to form a URL and download 
> that file at that location. I was thinking an RDD would be perfect for this. 
> I just want to hear from others who might have more experience in this. Below 
> is the actual JSON structure that I am trying to use for the S3 bucket and 
> key values of each “record" within “Records".
> 
> {
>"Records":[
>   {
>  "eventVersion":"2.0",
>  "eventSource":"aws:s3",
>  "awsRegion":"us-east-1",
>  "eventTime":The time, in ISO-8601 format, for example, 
> 1970-01-01T00:00:00.000Z, when S3 finished processing the request,
>  "eventName":"event-type",
>  "userIdentity":{
> 
> "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
>  },
>  "requestParameters":{
> "sourceIPAddress":"ip-address-where-request-came-from"
>  },
>  "responseElements":{
> "x-amz-request-id":"Amazon S3 generated request ID",
> "x-amz-id-2":"Amazon S3 host that processed the request"
>  },
>  "s3":{
> "s3SchemaVersion":"1.0",
> "configurationId":"ID found in the bucket notification 
> configuration",
> "bucket":{
>"name":"bucket-name",
>"ownerIdentity":{
>   "principalId":"Amazon-customer-ID-of-the-bucket-owner"
>},
>"arn":"bucket-ARN"
> },
> "object":{
>"key":"object-key",
>"size":object-size,
>"eTag":"object eTag",
>"versionId":"object version if bucket is versioning-enabled, 
> otherwise null",
>"sequencer": "a string representation of a hexadecimal value 
> used to determine event sequence,
>only used with PUTs and DELETEs"
> }
>  }
>   },
>   {
>   // Additional events
>   }
>]
> }
> 
> Thanks
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>


Re: can spark-csv package accept strings instead of files?

2016-04-15 Thread Benjamin Kim
Hi Hyukjin,

I saw that. I don’t know how to use it. I’m still learning Scala on my own. Can 
you help me to start?

Thanks,
Ben

> On Apr 15, 2016, at 8:02 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
> 
> I hope it was not too late :).
> 
> It is possible.
> 
> Please check csvRdd api here, 
> https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150
>  
> <https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala#L150>.
> 
> Thanks!
> 
> On 2 Apr 2016 2:47 a.m., "Benjamin Kim" <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
> data strings. Each string representing the header row and multiple rows of 
> data along with delimiters. I would like to feed each thru a CSV parser to 
> convert the data into a dataframe and, ultimately, UPSERT a Hive/HBase table 
> with this data.
> 
> Please let me know if you have any ideas.
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 



JSON Usage

2016-04-14 Thread Benjamin Kim
I was wonder what would be the best way to use JSON in Spark/Scala. I need to 
lookup values of fields in a collection of records to form a URL and download 
that file at that location. I was thinking an RDD would be perfect for this. I 
just want to hear from others who might have more experience in this. Below is 
the actual JSON structure that I am trying to use for the S3 bucket and key 
values of each “record" within “Records".

{  
   "Records":[  
  {  
 "eventVersion":"2.0",
 "eventSource":"aws:s3",
 "awsRegion":"us-east-1",
 "eventTime":The time, in ISO-8601 format, for example, 
1970-01-01T00:00:00.000Z, when S3 finished processing the request,
 "eventName":"event-type",
 "userIdentity":{  
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
 },
 "requestParameters":{  
"sourceIPAddress":"ip-address-where-request-came-from"
 },
 "responseElements":{  
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
 },
 "s3":{  
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification 
configuration",
"bucket":{  
   "name":"bucket-name",
   "ownerIdentity":{  
  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
   },
   "arn":"bucket-ARN"
},
"object":{  
   "key":"object-key",
   "size":object-size,
   "eTag":"object eTag",
   "versionId":"object version if bucket is versioning-enabled, 
otherwise null",
   "sequencer": "a string representation of a hexadecimal value 
used to determine event sequence, 
   only used with PUTs and DELETEs"
}
 }
  },
  {
  // Additional events
  }
   ]
}

Thanks
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-12 Thread Benjamin Kim
All,

I have more of a general Scala JSON question.

I have setup a notification on the S3 source bucket that triggers a Lambda 
function to unzip the new file placed there. Then, it saves the unzipped CSV 
file into another destination bucket where a notification is sent to a SQS 
topic. The contents of the message body is in JSON having the top level be the 
“Records” collection where within are 1 or more “Record” objects. I would like 
to know how to iterate through the “Records” retrieving each “Record” to 
extract the bucket value and the key value. I would then use this information 
to download the file into a DataFrame via spark-csv. Does anyone have any 
experience doing this?

I wrote some quick stab at it, but I know it’s not right.

def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(30))   // new context
val records = ssc.receiverStream(new SQSReceiver("amg-events")
.credentials(accessKey, secretKey)
.at(Regions.US_EAST_1)
.withTimeout(2))

records.foreach(record => {
val bucket = record['s3']['bucket']['name']
val key = record['s3']['object']['key']
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("s3://" + bucket + "/" + key)
//save to hbase
})

ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
ssc
}

Thanks,
Ben

> On Apr 9, 2016, at 6:12 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> Ah, I spoke too soon.
> 
> I thought the SQS part was going to be a spark package. It looks like it has 
> be compiled into a jar for use. Am I right? Can someone help with this? I 
> tried to compile it using SBT, but I’m stuck with a SonatypeKeys not found 
> error.
> 
> If there’s an easier alternative, please let me know.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 9, 2016, at 2:49 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> This was easy!
>> 
>> I just created a notification on a source S3 bucket to kick off a Lambda 
>> function that would decompress the dropped file and save it to another S3 
>> bucket. In return, this S3 bucket has a notification to send a SNS message 
>> to me via email. I can just as easily setup SQS to be the endpoint of this 
>> notification. This would then convey to a listening Spark Streaming job the 
>> file information to download. I like this!
>> 
>> Cheers,
>> Ben 
>> 
>>> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>> This is awesome! I have someplace to start from.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>>>> <mailto:programminggee...@gmail.com> wrote:
>>>> 
>>>> Someone please correct me if I am wrong as I am still rather green to 
>>>> spark, however it appears that through the S3 notification mechanism 
>>>> described below, you can publish events to SQS and use SQS as a streaming 
>>>> source into spark. The project at 
>>>> https://github.com/imapi/spark-sqs-receiver 
>>>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>>>> for doing this.
>>>> 
>>>> Hope this helps.
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>> 
>>>>> Nezih,
>>>>> 
>>>>> This looks like a good alternative to having the Spark Streaming job 
>>>>> check for new files on its own. Do you know if there is a way to have the 
>>>>> Spark Streaming job get notified with the new file information and act 
>>>>> upon it? This can reduce the overhead and cost of polling S3. Plus, I can 
>>>>> use this to notify and kick off Lambda to process new data files and make 
>>>>> them ready for Spark Streaming to consume. This will also use 
>>>>> notifications to trigger. I just need to have all incoming folders 
>>>>> configured for notifications for Lambda and all outgoing folders for 
>>>>> Spark Streaming. This sounds like a better set

Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
Ah, I spoke too soon.

I thought the SQS part was going to be a spark package. It looks like it has be 
compiled into a jar for use. Am I right? Can someone help with this? I tried to 
compile it using SBT, but I’m stuck with a SonatypeKeys not found error.

If there’s an easier alternative, please let me know.

Thanks,
Ben


> On Apr 9, 2016, at 2:49 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> This was easy!
> 
> I just created a notification on a source S3 bucket to kick off a Lambda 
> function that would decompress the dropped file and save it to another S3 
> bucket. In return, this S3 bucket has a notification to send a SNS message to 
> me via email. I can just as easily setup SQS to be the endpoint of this 
> notification. This would then convey to a listening Spark Streaming job the 
> file information to download. I like this!
> 
> Cheers,
> Ben 
> 
>> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>> This is awesome! I have someplace to start from.
>> 
>> Thanks,
>> Ben
>> 
>> 
>>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>>> <mailto:programminggee...@gmail.com> wrote:
>>> 
>>> Someone please correct me if I am wrong as I am still rather green to 
>>> spark, however it appears that through the S3 notification mechanism 
>>> described below, you can publish events to SQS and use SQS as a streaming 
>>> source into spark. The project at 
>>> https://github.com/imapi/spark-sqs-receiver 
>>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>>> for doing this.
>>> 
>>> Hope this helps.
>>> 
>>> Sent from my iPhone
>>> 
>>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> 
>>>> Nezih,
>>>> 
>>>> This looks like a good alternative to having the Spark Streaming job check 
>>>> for new files on its own. Do you know if there is a way to have the Spark 
>>>> Streaming job get notified with the new file information and act upon it? 
>>>> This can reduce the overhead and cost of polling S3. Plus, I can use this 
>>>> to notify and kick off Lambda to process new data files and make them 
>>>> ready for Spark Streaming to consume. This will also use notifications to 
>>>> trigger. I just need to have all incoming folders configured for 
>>>> notifications for Lambda and all outgoing folders for Spark Streaming. 
>>>> This sounds like a better setup than we have now.
>>>> 
>>>> Thanks,
>>>> Ben
>>>> 
>>>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>>>> <mailto:nyigitb...@netflix.com>> wrote:
>>>>> 
>>>>> While it is doable in Spark, S3 also supports notifications: 
>>>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>>>> 
>>>>> 
>>>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>>>> <mailto:nlaucha...@gmail.com>> wrote:
>>>>> Hi Benjamin,
>>>>> 
>>>>> I have done it . The critical configuration items are the ones below :
>>>>> 
>>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>>>> AccessKeyId)
>>>>>   
>>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>>>> AWSSecretAccessKey)
>>>>> 
>>>>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder 
>>>>> ")
>>>>> 
>>>>> This code will probe for new S3 files created in your every batch 
>>>>> interval.
>>>>> 
>>>>> Thanks,
>>>>> Natu
>>>>> 
>>>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>>>> pulled any new files to process? If so, can you provide basic Scala 
>>>>> coding help on this?
>>>>> 
>>>>> Thanks,
>>>>> Ben
>>>>> 
>>>>> 
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>>> <mailto:user-h...@spark.apache.org>
>>>>> 
>>>>> 
>>>> 
>> 
> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
This was easy!

I just created a notification on a source S3 bucket to kick off a Lambda 
function that would decompress the dropped file and save it to another S3 
bucket. In return, this S3 bucket has a notification to send a SNS message to 
me via email. I can just as easily setup SQS to be the endpoint of this 
notification. This would then convey to a listening Spark Streaming job the 
file information to download. I like this!

Cheers,
Ben 

> On Apr 9, 2016, at 9:54 AM, Benjamin Kim <bbuil...@gmail.com> wrote:
> 
> This is awesome! I have someplace to start from.
> 
> Thanks,
> Ben
> 
> 
>> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com 
>> <mailto:programminggee...@gmail.com> wrote:
>> 
>> Someone please correct me if I am wrong as I am still rather green to spark, 
>> however it appears that through the S3 notification mechanism described 
>> below, you can publish events to SQS and use SQS as a streaming source into 
>> spark. The project at https://github.com/imapi/spark-sqs-receiver 
>> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
>> for doing this.
>> 
>> Hope this helps.
>> 
>> Sent from my iPhone
>> 
>> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> 
>>> Nezih,
>>> 
>>> This looks like a good alternative to having the Spark Streaming job check 
>>> for new files on its own. Do you know if there is a way to have the Spark 
>>> Streaming job get notified with the new file information and act upon it? 
>>> This can reduce the overhead and cost of polling S3. Plus, I can use this 
>>> to notify and kick off Lambda to process new data files and make them ready 
>>> for Spark Streaming to consume. This will also use notifications to 
>>> trigger. I just need to have all incoming folders configured for 
>>> notifications for Lambda and all outgoing folders for Spark Streaming. This 
>>> sounds like a better setup than we have now.
>>> 
>>> Thanks,
>>> Ben
>>> 
>>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>>> <mailto:nyigitb...@netflix.com>> wrote:
>>>> 
>>>> While it is doable in Spark, S3 also supports notifications: 
>>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>>> 
>>>> 
>>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>>> <mailto:nlaucha...@gmail.com>> wrote:
>>>> Hi Benjamin,
>>>> 
>>>> I have done it . The critical configuration items are the ones below :
>>>> 
>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>>> AccessKeyId)
>>>>   
>>>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>>> AWSSecretAccessKey)
>>>> 
>>>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder 
>>>> ")
>>>> 
>>>> This code will probe for new S3 files created in your every batch interval.
>>>> 
>>>> Thanks,
>>>> Natu
>>>> 
>>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>>> pulled any new files to process? If so, can you provide basic Scala coding 
>>>> help on this?
>>>> 
>>>> Thanks,
>>>> Ben
>>>> 
>>>> 
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>>> <mailto:user-unsubscr...@spark.apache.org>
>>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>>> <mailto:user-h...@spark.apache.org>
>>>> 
>>>> 
>>> 
> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
This is awesome! I have someplace to start from.

Thanks,
Ben


> On Apr 9, 2016, at 9:45 AM, programminggee...@gmail.com wrote:
> 
> Someone please correct me if I am wrong as I am still rather green to spark, 
> however it appears that through the S3 notification mechanism described 
> below, you can publish events to SQS and use SQS as a streaming source into 
> spark. The project at https://github.com/imapi/spark-sqs-receiver 
> <https://github.com/imapi/spark-sqs-receiver> appears to provide libraries 
> for doing this.
> 
> Hope this helps.
> 
> Sent from my iPhone
> 
> On Apr 9, 2016, at 9:55 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> 
>> Nezih,
>> 
>> This looks like a good alternative to having the Spark Streaming job check 
>> for new files on its own. Do you know if there is a way to have the Spark 
>> Streaming job get notified with the new file information and act upon it? 
>> This can reduce the overhead and cost of polling S3. Plus, I can use this to 
>> notify and kick off Lambda to process new data files and make them ready for 
>> Spark Streaming to consume. This will also use notifications to trigger. I 
>> just need to have all incoming folders configured for notifications for 
>> Lambda and all outgoing folders for Spark Streaming. This sounds like a 
>> better setup than we have now.
>> 
>> Thanks,
>> Ben
>> 
>>> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com 
>>> <mailto:nyigitb...@netflix.com>> wrote:
>>> 
>>> While it is doable in Spark, S3 also supports notifications: 
>>> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
>>> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
>>> 
>>> 
>>> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
>>> <mailto:nlaucha...@gmail.com>> wrote:
>>> Hi Benjamin,
>>> 
>>> I have done it . The critical configuration items are the ones below :
>>> 
>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
>>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
>>> AccessKeyId)
>>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
>>> AWSSecretAccessKey)
>>> 
>>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder 
>>> ")
>>> 
>>> This code will probe for new S3 files created in your every batch interval.
>>> 
>>> Thanks,
>>> Natu
>>> 
>>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
>>> <mailto:bbuil...@gmail.com>> wrote:
>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
>>> pulled any new files to process? If so, can you provide basic Scala coding 
>>> help on this?
>>> 
>>> Thanks,
>>> Ben
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> <mailto:user-h...@spark.apache.org>
>>> 
>>> 
>> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
Nezih,

This looks like a good alternative to having the Spark Streaming job check for 
new files on its own. Do you know if there is a way to have the Spark Streaming 
job get notified with the new file information and act upon it? This can reduce 
the overhead and cost of polling S3. Plus, I can use this to notify and kick 
off Lambda to process new data files and make them ready for Spark Streaming to 
consume. This will also use notifications to trigger. I just need to have all 
incoming folders configured for notifications for Lambda and all outgoing 
folders for Spark Streaming. This sounds like a better setup than we have now.

Thanks,
Ben

> On Apr 9, 2016, at 12:25 AM, Nezih Yigitbasi <nyigitb...@netflix.com> wrote:
> 
> While it is doable in Spark, S3 also supports notifications: 
> http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html 
> <http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html>
> 
> 
> On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande <nlaucha...@gmail.com 
> <mailto:nlaucha...@gmail.com>> wrote:
> Hi Benjamin,
> 
> I have done it . The critical configuration items are the ones below :
> 
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
> AccessKeyId)
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
> AWSSecretAccessKey)
> 
>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder")
> 
> This code will probe for new S3 files created in your every batch interval.
> 
> Thanks,
> Natu
> 
> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
> pulled any new files to process? If so, can you provide basic Scala coding 
> help on this?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Benjamin Kim
Natu,

Do you know if textFileStream can see if new files are created underneath a 
whole bucket? For example, if the bucket name is incoming and new files 
underneath it are 2016/04/09/00/00/01/data.csv and 
2016/04/09/00/00/02/data/csv, will these files be picked up? Also, will Spark 
Streaming not pick up these files again on the following run knowing that it 
already picked them up or do we have to store state somewhere, like the last 
run date and time to compare against?

Thanks,
Ben

> On Apr 8, 2016, at 9:15 PM, Natu Lauchande <nlaucha...@gmail.com> wrote:
> 
> Hi Benjamin,
> 
> I have done it . The critical configuration items are the ones below :
> 
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl", 
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", 
> AccessKeyId)
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", 
> AWSSecretAccessKey)
> 
>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder")
> 
> This code will probe for new S3 files created in your every batch interval.
> 
> Thanks,
> Natu
> 
> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Has anyone monitored an S3 bucket or directory using Spark Streaming and 
> pulled any new files to process? If so, can you provide basic Scala coding 
> help on this?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Monitoring S3 Bucket with Spark Streaming

2016-04-08 Thread Benjamin Kim
Has anyone monitored an S3 bucket or directory using Spark Streaming and pulled 
any new files to process? If so, can you provide basic Scala coding help on 
this?

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



can spark-csv package accept strings instead of files?

2016-04-01 Thread Benjamin Kim
Does anyone know if this is possible? I have an RDD loaded with rows of CSV 
data strings. Each string representing the header row and multiple rows of data 
along with delimiters. I would like to feed each thru a CSV parser to convert 
the data into a dataframe and, ultimately, UPSERT a Hive/HBase table with this 
data.

Please let me know if you have any ideas.

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

I forgot to mention that - this is the ugly part - the source data provider 
gives us (Windows) pkzip compressed files. Will spark uncompress these 
automatically? I haven’t been able to make it work.

Thanks,
Ben

> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> Hi Ben,
> 
> Well I have done it for standard csv files downloaded from spreadsheets to 
> staging directory on hdfs and loaded from there.
> 
> First you may not need to unzip them. dartabricks can read them (in my case) 
> and zipped files.
> 
> Check this. Mine is slightly different from what you have, First I zip my csv 
> files with bzip2 and load them into hdfs
> 
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""===  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""===  Started deleting old files from hdfs staging 
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging directory 
> ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""===  Checking that all files are moved to hdfs staging 
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
> 
> Now you have all your csv files in the staging directory
> 
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> 
> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", 
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String, 
> Description: String, Value: Double, Balance: Double, AccountName: String, 
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p => 
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> 
> // Need to create and populate target ORC table nw_10124772 in database 
> accounts.in <http://accounts.in/> Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDateDATE
> ,TransactionType   String
> ,Description   String
> ,Value Double
> ,Balance   Double
> ,AccountName   String
> ,AccountNumber Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>   
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
>  AS TransactionDate
> , TransactionType
> , Description
> , Value
> , Balance
> , AccountName
>     , AccountNumber
> FROM tmp
> """
> sql(sqltext)
> 
> println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore
> 
> Once you store into a some form of table (Parquet, ORC) etc you can do 
> whatever you like with it.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 30 March 2016 at 22:13, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Mich,
> 
> You are correct. I am talking about the Databricks package spark-csv you ha

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

You are correct. I am talking about the Databricks package spark-csv you have 
below.

The files are stored in s3 and I download, unzip, and store each one of them in 
a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).

Here is some of the code.

val filesRdd = sc.parallelize(lFiles, 250)
filesRdd.foreachPartition(files => {
  val s3Client = new AmazonS3Client(new 
EnvironmentVariableCredentialsProvider())
  files.foreach(file => {
val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
val zipFile = new ZipInputStream(s3Object.getObjectContent())
val csvFile = readZipStream(zipFile)
  })
})

This function does the unzipping and converts to string.

def readZipStream(stream: ZipInputStream): String = {
  stream.getNextEntry
  var stuff = new ListBuffer[String]()
  val scanner = new Scanner(stream)
  while(scanner.hasNextLine){
stuff += scanner.nextLine
  }
  stuff.toList.mkString("\n")
}

The next step is to parse the CSV string and convert to a dataframe, which will 
populate a Hive/HBase table.

If you can help, I would be truly grateful.

Thanks,
Ben


> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> just to clarify are you talking about databricks csv package.
> 
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
> 
> Where are these zipped files? Are they copied to a staging directory in hdfs?
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 30 March 2016 at 15:17, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I have a quick question. I have downloaded multiple zipped files from S3 and 
> unzipped each one of them into strings. The next step is to parse using a CSV 
> parser. I want to know if there is a way to easily use the spark csv package 
> for this?
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
I have a quick question. I have downloaded multiple zipped files from S3 and 
unzipped each one of them into strings. The next step is to parse using a CSV 
parser. I want to know if there is a way to easily use the spark csv package 
for this?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



BinaryFiles to ZipInputStream

2016-03-23 Thread Benjamin Kim
I need a little help. I am loading into Spark 1.6 zipped csv files stored in s3.

First of all, I am able to get the List of file keys that have a modified date 
within a range of time by using the AWS SDK Objects (AmazonS3Client, 
ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest). Then, by 
setting up the HadoopConfiguration object with s3 access and secret keys, I 
parallelize, partition, and iterate through the List to load each file’s 
contents into a RDD[(String, org.apache.spark.input.PortableDataStream)].

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", accessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey)

val filesRdd = sc.parallelize(lFiles)
filesRdd.foreachPartition(files => {
  val lZipFiles = files.map(x => sc.binaryFiles("s3://" + s3Bucket + "/" + x))
  val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all 
zip input streams
  val lStrContent = lZipStream.map(x => readZipStream(x))  // read contents 
into string 

})

This is where I need help. I get this error.

:196: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)]
 required: java.io.InputStream
val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // 
make them all zip input streams

   ^

Does anyone know how to load the PortableDataStream returned in a RDD and 
convert it into a ZipInputStream?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: new object store driver for Spark

2016-03-22 Thread Benjamin Kim
Hi Gil,

Currently, our company uses S3 heavily for data storage. Can you further 
explain the benefits of this in relation to S3 when the pending patch does come 
out? Also, I have heard of Swift from others. Can you explain to me the pros 
and cons of Swift compared to HDFS? It can be just a brief summary if you like 
or just guide me to material that will help me get a better understanding.

Thanks,
Ben

> On Mar 22, 2016, at 6:35 AM, Gil Vernik  wrote:
> 
> We recently released an object store connector for Spark. 
> https://github.com/SparkTC/stocator 
> Currently this connector contains driver for the Swift based object store ( 
> like SoftLayer or any other Swift cluster ), but it can easily support 
> additional object stores.
> There is a pending patch to support Amazon S3 object store. 
> 
> The major highlight is that this connector doesn't create any temporary files 
>  and so it achieves very fast response times when Spark persist data in the 
> object store.
> The new connector supports speculate mode and covers various failure 
> scenarios ( like two Spark tasks writing into same object, partial corrupted 
> data due to run time exceptions in Spark master, etc ).  It also covers 
> https://issues.apache.org/jira/browse/SPARK-10063 
> and other known issues.
> 
> The detail algorithm for fault tolerance will be released very soon. For now, 
> those who interested, can view the implementation in the code itself.
> 
>  https://github.com/SparkTC/stocator 
> contains all the details how to setup 
> and use with Spark.
> 
> A series of tests showed that the new connector obtains 70% improvements for 
> write operations from Spark to Swift and about 30% improvements for read 
> operations from Swift into Spark ( comparing to the existing driver that 
> Spark uses to integrate with objects stored in Swift). 
> 
> There is an ongoing work to add more coverage and fix some known bugs / 
> limitations.
> 
> All the best
> Gil
> 



Re: S3 Zip File Loading Advice

2016-03-15 Thread Benjamin Kim
Hi Xinh,

I tried to wrap it, but it still didn’t work. I got a 
"java.util.ConcurrentModificationException”.

All,

I have been trying and trying with some help of a coworker, but it’s slow 
going. I have been able to gather a list of the s3 files I need to download.

### S3 Lists ###
import scala.collection.JavaConverters._
import java.util.ArrayList
import java.util.zip.{ZipEntry, ZipInputStream}
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ObjectListing, S3ObjectSummary, 
ListObjectsRequest, GetObjectRequest}
import org.apache.commons.io.IOUtils
import org.joda.time.{DateTime, Period}
import org.joda.time.format.DateTimeFormat

val s3Bucket = "amg-events"

val formatter = DateTimeFormat.forPattern("/MM/dd/HH")
var then = DateTime.now()

var files = new ArrayList[String]

//S3 Client and List Object Request
val s3Client = new AmazonS3Client()
val listObjectsRequest = new ListObjectsRequest()
var objectListing: ObjectListing = null

//Your S3 Bucket
listObjectsRequest.setBucketName(s3Bucket)

var now = DateTime.now()
var range = 
Iterator.iterate(now.minusDays(1))(_.plus(Period.hours(1))).takeWhile(!_.isAfter(now))
range.foreach(ymdh => {
  //Your Folder path or Prefix
  listObjectsRequest.setPrefix(formatter.print(ymdh))

  //Adding s3:// to the paths and adding to a list
  do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
  if (objectSummary.getKey().contains(".csv.zip") && 
objectSummary.getLastModified().after(then.toDate())) {
//files.add(objectSummary.getKey())
files.add("s3n://" + s3Bucket + "/" + objectSummary.getKey())
  }
}
listObjectsRequest.setMarker(objectListing.getNextMarker())
  } while (objectListing.isTruncated())
})
then = now

//Creating a Scala List for same
val fileList = files.asScala

//Parallelize the Scala List
val fileRDD = sc.parallelize(fileList)

Now, I am trying to go through the list and download each file, unzip each file 
as it comes, and pass the ZipInputStream to the CSV parser. This is where I get 
stuck.

var df: DataFrame = null
for (file <- fileList) {
  val zipfile = s3Client.getObject(new GetObjectRequest(s3Bucket, 
file)).getObjectContent()
  val zis = new ZipInputStream(zipfile)
  var ze = zis.getNextEntry()
//  val fileDf = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").option("inferSchema", "true").load(zis)
//  if (df != null) {
//df = df.unionAll(fileDf)
//  } else {
//df = fileDf
//  }
}

I don’t know if I am doing it right or not. I also read that parallelizing 
fileList would allow parallel file retrieval. But, I don’t know how to proceed 
from here.

If you can help, I would be grateful.

Thanks,
Ben


> On Mar 9, 2016, at 10:10 AM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
> 
> Could you wrap the ZipInputStream in a List, since a subtype of 
> TraversableOnce[?] is required?
> 
> case (name, content) => List(new ZipInputStream(content.open))
> 
> Xinh
> 
> On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Sabarish,
> 
> I found a similar posting online where I should use the S3 listKeys. 
> http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd
>  
> <http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd>.
>  Is this what you were thinking?
> 
> And, your assumption is correct. The zipped CSV file contains only a single 
> file. I found this posting. 
> http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark 
> <http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark>. I 
> see how to do the unzipping, but I cannot get it to work when running the 
> code directly.
> 
> ...
> import java.io <http://java.io/>.{ IOException, FileOutputStream, 
> FileInputStream, File }
> import java.util.zip.{ ZipEntry, ZipInputStream }
> import org.apache.spark.input.PortableDataStream
> 
> sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
> 
> val zipFile = 
> "s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip
>  <>"
> val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String, 
> content: PortableDataStream) => new ZipInputStream(content.open) }
> 
> :95: error: type mismatch;
>  found   : java.util.zip.ZipInputStream
&

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

Is there anything in the works or are there tasks already to do the 
back-porting?

Just curious.

Thanks,
Ben

> On Mar 13, 2016, at 3:46 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> class HFileWriterImpl (in standalone file) is only present in master branch.
> It is not in branch-1.
> 
> compressionByName() resides in class with @InterfaceAudience.Private which 
> got moved in master branch.
> 
> So looks like there is some work to be done for backporting to branch-1 :-)
> 
> On Sun, Mar 13, 2016 at 1:35 PM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Ted,
> 
> I did as you said, but it looks like that HBaseContext relies on some 
> differences in HBase itself.
> 
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
>  error: object HFileWriterImpl is not a member of package 
> org.apache.hadoop.hbase.io.hfile
> [ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig, 
> HFileContextBuilder, HFileWriterImpl}
> [ERROR]^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
>  error: not found: value HFileWriterImpl
> [ERROR] val hfileCompression = HFileWriterImpl
> [ERROR]^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
>  error: not found: value HFileWriterImpl
> [ERROR] val defaultCompression = HFileWriterImpl
> [ERROR]  ^
> [ERROR] 
> /home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
>  error: value COMPARATOR is not a member of object 
> org.apache.hadoop.hbase.CellComparator
> [ERROR] 
> .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
> 
> So… back to my original question… do you know when these incompatibilities 
> were introduced? If so, I can pulled that version at time and try again.
> 
> Thanks,
> Ben
> 
>> On Mar 13, 2016, at 12:42 PM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Benjamin:
>> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
>> subtree into hbase 1.0 root dir and add the following to root pom.xml:
>>     hbase-spark
>> 
>> Then you would be able to build the module yourself.
>> 
>> hbase-spark module uses APIs which are compatible with hbase 1.0
>> 
>> Cheers
>> 
>> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Hi Ted,
>> 
>> I see that you’re working on the hbase-spark module for hbase. I recently 
>> packaged the SparkOnHBase project and gave it a test run. It works like a 
>> charm on CDH 5.4 and 5.5. All I had to do was add 
>> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
>> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
>> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the 
>> easy-to-use HBaseContext for HBase operations. Now, I want to use the latest 
>> in Dataframes. Since the new functionality is only in the hbase-spark 
>> module, I want to know how to get it and package it for CDH 5.5, which still 
>> uses HBase 1.0.0. Can you tell me what version of hbase master is still 
>> backwards compatible?
>> 
>> By the way, we are using Spark 1.6 if it matters.
>> 
>> Thanks,
>> Ben
>> 
>>> On Feb 10, 2016, at 2:34 AM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>>> 
>>> Cheers
>>> 
>>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>>> + Spark-Dev
>>> 
>>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>>> executor, does localization and brings all hbase-client jars into executor 
>>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>>> client jars, when i checked launch container.sh , Classpath does not have 
>>> $PWD/* and hence all the hbase client jars are ignored.
>>> 
>>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>>> 
>>> T

Re: Spark Job on YARN accessing Hbase Table

2016-03-13 Thread Benjamin Kim
Ted,

I did as you said, but it looks like that HBaseContext relies on some 
differences in HBase itself.

[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:30:
 error: object HFileWriterImpl is not a member of package 
org.apache.hadoop.hbase.io.hfile
[ERROR] import org.apache.hadoop.hbase.io.hfile.{CacheConfig, 
HFileContextBuilder, HFileWriterImpl}
[ERROR]^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:627:
 error: not found: value HFileWriterImpl
[ERROR] val hfileCompression = HFileWriterImpl
[ERROR]^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:750:
 error: not found: value HFileWriterImpl
[ERROR] val defaultCompression = HFileWriterImpl
[ERROR]  ^
[ERROR] 
/home/bkim/hbase-rel-1.0.2/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala:898:
 error: value COMPARATOR is not a member of object 
org.apache.hadoop.hbase.CellComparator
[ERROR] 
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)

So… back to my original question… do you know when these incompatibilities were 
introduced? If so, I can pulled that version at time and try again.

Thanks,
Ben

> On Mar 13, 2016, at 12:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Benjamin:
> Since hbase-spark is in its own module, you can pull the whole hbase-spark 
> subtree into hbase 1.0 root dir and add the following to root pom.xml:
> hbase-spark
> 
> Then you would be able to build the module yourself.
> 
> hbase-spark module uses APIs which are compatible with hbase 1.0
> 
> Cheers
> 
> On Sun, Mar 13, 2016 at 11:39 AM, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Ted,
> 
> I see that you’re working on the hbase-spark module for hbase. I recently 
> packaged the SparkOnHBase project and gave it a test run. It works like a 
> charm on CDH 5.4 and 5.5. All I had to do was add 
> /opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar to the 
> classpath.txt file in /etc/spark/conf. Then, I ran spark-shell with “—jars 
> /path/to/spark-hbase-0.0.2-clabs.jar” as an argument and used the easy-to-use 
> HBaseContext for HBase operations. Now, I want to use the latest in 
> Dataframes. Since the new functionality is only in the hbase-spark module, I 
> want to know how to get it and package it for CDH 5.5, which still uses HBase 
> 1.0.0. Can you tell me what version of hbase master is still backwards 
> compatible?
> 
> By the way, we are using Spark 1.6 if it matters.
> 
> Thanks,
> Ben
> 
>> On Feb 10, 2016, at 2:34 AM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>> 
>> Cheers
>> 
>> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>> + Spark-Dev
>> 
>> For a Spark job on YARN accessing hbase table, added all hbase client jars 
>> into spark.yarn.dist.files, NodeManager when launching container i.e 
>> executor, does localization and brings all hbase-client jars into executor 
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase 
>> client jars, when i checked launch container.sh , Classpath does not have 
>> $PWD/* and hence all the hbase client jars are ignored.
>> 
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>> 
>> Thanks,
>> Prabhu Joseph 
>> 
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <prabhujose.ga...@gmail.com 
>> <mailto:prabhujose.ga...@gmail.com>> wrote:
>> Hi All,
>> 
>>  When i do count on a Hbase table from Spark Shell which runs as yarn-client 
>> mode, the job fails at count().
>> 
>> MASTER=yarn-client ./spark-shell
>> 
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, 
>> TableName}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>  
>> val conf = HBaseConfiguration.create()
>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>> 
>> val hBaseRDD = sc.newAPIHadoopRDD(conf, 
>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>> hBaseRDD.count()
>> 
>> 
>> Tasks throw below exception, the actual exception is swallowed, a bug 
>> JDK

  1   2   >