Train ML models on each partition

2019-05-08 Thread Qian He
I have a 1TB dataset with 100 columns. The first column is a user_id, there
are about 1000 unique user_ids in this 1TB dataset.

The use case: I want to train a ML model for each user_id on this user's
records (approximately 1GB records per user). Say the ML model is a
Decision Tree. But it is not feasible to create 1000 Spark applications to
achieve this. Can I launch just one Spark application and accomplish the
trainings of these 1000 DT models? How?

Can I just partition the 1TB data by user_id, and then train model for each
partition?

Thanks!


Re: Static partitioning in partitionBy()

2019-05-08 Thread Gourav Sengupta
Hi Burak,
Hurray so you made finally delta open source :)
I always thought of asking TD, is there any chance we could get the
streaming graphs back in the SPARK UI? It will just be wonderful.

Hi Shubham,
there are always easier way and super fancy way to solve problems,
filtering data before persisting is a simple way. Similarly handling data
skew in a simple way would be by using monotonically increasing id function
in spark with modulus operator. For the fancy way I am sure that someone in
the world will be working for mere mortals like me :)


Regards,
Gourav Sengupta





On Wed, May 8, 2019 at 1:41 PM Shubham Chaurasia 
wrote:

> Thanks
>
> On Wed, May 8, 2019 at 10:36 AM Felix Cheung 
> wrote:
>
>> You could
>>
>> df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save
>>
>> It could get some data skew problem but might work for you
>>
>>
>>
>> --
>> *From:* Burak Yavuz 
>> *Sent:* Tuesday, May 7, 2019 9:35:10 AM
>> *To:* Shubham Chaurasia
>> *Cc:* dev; user@spark.apache.org
>> *Subject:* Re: Static partitioning in partitionBy()
>>
>> It depends on the data source. Delta Lake (https://delta.io) allows you
>> to do it with the .option("replaceWhere", "c = c1"). With other file
>> formats, you can write directly into the partition directory
>> (tablePath/c=c1), but you lose atomicity.
>>
>> On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
>> wrote:
>>
>>> Hi All,
>>>
>>> Is there a way I can provide static partitions in partitionBy()?
>>>
>>> Like:
>>>
>>> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>>>
>>> Above code gives following error as it tries to find column `c=c1` in df.
>>>
>>> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not
>>> found in schema struct;
>>>
>>> Thanks,
>>> Shubham
>>>
>>


pyspark on pycharm error

2019-05-08 Thread karan alang
Hello - anyone has any ideas on this PySpark/PyCharm error in SO, pls. let
me know.


https://stackoverflow.com/questions/56028402/java-util-nosuchelementexception-key-not-found-pyspark-driver-callback-host



essentially, on pyspark on PyCharm, i get the following error -

java.util.NoSuchElementException: key not found:
_PYSPARK_DRIVER_CALLBACK_HOSTException: Java gateway process
exited before sending its port number


IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] while using spark-sql-2.4.1v to read data from oracle

2019-05-08 Thread Shyam P
Hi ,
I have oracle table in which has
column schema is : DATA_DATE DATE something like 31-MAR-02

 I am trying to retrieve data from oracle using spark-sql-2.4.1 version. I
tried to set the JdbcOptions as below :


.option("lowerBound", "2002-03-31 00:00:00");
.option("upperBound", "2019-05-01 23:59:59");
.option("timestampFormat", "-mm-dd hh:mm:ss");
.option("partitionColumn", "DATA_DATE");
.option("numPartitions", 240);

But gives error :

java.lang.IllegalArgumentException: Timestamp format must be -mm-dd
hh:mm:ss[.f]
at java.sql.Timestamp.valueOf(Timestamp.java:204)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.toInternalBoundValue(JDBCRelation.scala:179)

Any clue how it need to be handled/fixed?

https://stackoverflow.com/questions/56020103/how-to-pass-date-timestamp-as-lowerbound-upperbound-in-spark-sql-2-4-1v


Any help is highly appreciated and thankful.

Regards,
Shyam


Re: Static partitioning in partitionBy()

2019-05-08 Thread Shubham Chaurasia
Thanks

On Wed, May 8, 2019 at 10:36 AM Felix Cheung 
wrote:

> You could
>
> df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save
>
> It could get some data skew problem but might work for you
>
>
>
> --
> *From:* Burak Yavuz 
> *Sent:* Tuesday, May 7, 2019 9:35:10 AM
> *To:* Shubham Chaurasia
> *Cc:* dev; user@spark.apache.org
> *Subject:* Re: Static partitioning in partitionBy()
>
> It depends on the data source. Delta Lake (https://delta.io) allows you
> to do it with the .option("replaceWhere", "c = c1"). With other file
> formats, you can write directly into the partition directory
> (tablePath/c=c1), but you lose atomicity.
>
> On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
> wrote:
>
>> Hi All,
>>
>> Is there a way I can provide static partitions in partitionBy()?
>>
>> Like:
>> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>>
>> Above code gives following error as it tries to find column `c=c1` in df.
>>
>> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
>> in schema struct;
>>
>> Thanks,
>> Shubham
>>
>


HiveTableRelation Assertion Error | Joining Stream with Hive table

2019-05-08 Thread Manjunath N
Hi,

I have a dataframe reading messages from kafka topic. I have another
dataframe created from a hive table. When I join (inner) stream with hive
table i am getting below assertion error.

java.lang.AssertionError: assertion failed: No plan for HiveTableRelation

When i do an explain on the joined dataframe i get following error.

java.lang.UnsupportedOperationException: LeafNode StreamingRelation must
implement statistics.

Any hints to resolve this issue would be helpful.

I am using spark 2.2.0 version

Thanks,
Manjunath


How does org.apache.spark.sql.catalyst.util.MapData support hash lookup?

2019-05-08 Thread Shawn Yang
Hi guys,
I'm reading spark source code. When I read
org.apache.spark.sql.catalyst.util.ArrayBasedMapData,
org.apache.spark.sql.catalyst.expressions.UnsafeMapData, I can't understand
how it supports hash lookup? Is there anything I miss?


Re: Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-08 Thread Akshay Bhardwaj
Hi Austin,

A few questions:

   1. What is the partition of the kafka topic that used for input and
   output data?
   2. In the write stream, I will recommend to use "trigger" with a defined
   interval, if you prefer micro-batching strategy,
   3. along with defining "maxOffsetsPerTrigger" in kafka readStream
   options, which lets you choose the amount of messages you want per trigger.
   (Helps in maintaining the expected threshold of executors/memory for the
   cluster)

For repeated log messages, notice in your logs the streaming query progress
published. This progress status displays a lot of metrics that shall be
your first diagnosis to identify issues.
The progress status with kafka stream displays the "startOffset" and
"endOffset" values per batch. This is listed topic-partition wise the start
to end offsets per trigger batch of streaming query.


Akshay Bhardwaj
+91-97111-33849


On Tue, May 7, 2019 at 8:02 PM Austin Weaver  wrote:

> Hey Spark Experts,
>
> After listening to some of you, and the presentations at Spark Summit in
> SF, I am transitioning from d-streams to structured streaming however I am
> seeing some weird results.
>
> My use case is as follows: I am reading in a stream from a kafka topic,
> transforming a message, and writing the transformed message to another
> kafka topic.
>
> While running my stream, I can see the transformed messages on the output
> topic so I know the basic structure of my stream seems to be running as
> intended.
>
> Inside my transformation, I am logging the total transform time as well as
> the raw message being transformed. (Java by the way)
>
> The 2 weird things I am seeing:
> 1) I am seeing that the consumer lag for this particular consumer group on
> the input topic is increasing. This does not make sense to me - looking at
> the transform time from the logs, it should easily be able to handle the
> incoming feed. To give an example the transform times are < 10 ms per
> record and the sample of data does not contain > 100 messages per second.
> The stream should be reducing consumer lag as it runs (especially
> considering multiple workers and partitions)
>
> 2) I am seeing the same log transformation messages over and over on the
> dataproc spark cluster logs. For example, I am currently looking at my logs
> and the last 20+ log messages are the exact same
>
> I thought 2 may be due to offsets not being handled correctly, but I am
> seeing a reasonable range of transformed messages on the target topic, and
> I'm using the built in checkpointing for spark to handle the offsets for me.
>
> In terms of 1, why would I be seeing the same log messages over and over?
> It doesnt make sense to me - wouldnt the message only be transformed once
> and it's offset committed?
>
> If anything stands out as incorrect, or something you've seen please let
> me know - this is rather new to me and my code seems to be following the
> same as other examples I see across the net
>
> Here's a redacted snippet of my stream:
>
> spark.readStream().format("kafka").option("kafka.bootstrap.servers",
> "X")
> .option("kafka.partition.assignment.strategy",
> RoundRobinAssignor.class.getName())
> .option("subscribe", """")
> .option("startingOffsets", "earliest")
> .load()
> .select("value")
> .as(Encoders.STRING())
> .map((MapFunction) value -> transform(value),
> Encoders.STRING())
> .writeStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "X")
> .option("topic", ""X"")
> .outputMode("append")
> .option("checkpointLocation", "/checkpoints/testCheckpoint")
> .start()
> .awaitTermination();
>
> Thanks!
> Austin
>