[Structured Streaming] Avoiding multiple streaming queries

2018-02-12 Thread Priyank Shrivastava
I have a structured streaming query which sinks to Kafka.  This query has a
complex aggregation logic.


I would like to sink the output DF of this query to multiple Kafka topics
each partitioned on a different ‘key’ column.  I don’t want to have
multiple Kafka sinks for each of the different Kafka topics because that
would mean running multiple streaming queries - one for each Kafka topic,
especially since my aggregation logic is complex.


Questions:

1.  Is there a way to output the results of a structured streaming query to
multiple Kafka topics each with a different key column but without having
to execute multiple streaming queries?


2.  If not,  would it be efficient to cascade the multiple queries such
that the first query does the complex aggregation and writes output
to Kafka and then the other queries just read the output of the first query
and write their topics to Kafka thus avoiding doing the complex aggregation
again?


Thanks in advance for any help.


Priyank


Re: Spark on K8s with Romana

2018-02-12 Thread Yinan Li
We actually moved away from using the driver pod IP because of
https://github.com/apache-spark-on-k8s/spark/issues/482. The current way
this works is that the driver url is constructed based on the value of
"spark.driver.host" that is set to the DNS name of the headless driver
service in the submission client. So the assumption is that kube-dns exists
and is working. Unfortunately, this is not configurable.


On Mon, Feb 12, 2018 at 1:21 PM, Jenna Hoole  wrote:

> So, we've run into something interesting. In our case, we've got some
> proprietary networking HW which is very feature limited in the TCP/IP
> space, so using Romana, executors can't seem to find the driver using the
> hostname lookup method it's attempting. Is there any way to make it use IP?
>
> Thanks,
> Jenna
>


Spark on K8s with Romana

2018-02-12 Thread Jenna Hoole
So, we've run into something interesting. In our case, we've got some
proprietary networking HW which is very feature limited in the TCP/IP
space, so using Romana, executors can't seem to find the driver using the
hostname lookup method it's attempting. Is there any way to make it use IP?

Thanks,
Jenna


org.apache.kafka.clients.consumer.OffsetOutOfRangeException

2018-02-12 Thread Mina Aslani
Hi,

I am getting below error
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions:
{topic1-0=304337}
as soon as I submit a spark app to my cluster.

I am using below dependency
name: 'spark-streaming-kafka-0-10_2.11', version: '2.2.0' And setting the
consumer's reset config(e.g. AUTO_OFFSET_RESET_CONFIG) to "earliest".
As per https://kafka.apache.org/0110/documentation.html the exception
should be thrown only when the consumer's reset config has not been set
(e.g. default=none).
Wondering what is the cause and how to fix.

Best regards,
Mina


Re: optimize hive query to move a subset of data from one partition table to another table

2018-02-12 Thread devjyoti patra
Can you try running your query with static literal for date filter.
(join_date >= SOME 2 MONTH OLD DATE). I cannot think of any reason why this
query should create more than 60 tasks.




On 12 Feb 2018 6:26 am, "amit kumar singh"  wrote:

Hi

create table emp as select * from emp_full where join_date
>=date_sub(join_date,2)

i am trying to select from one table insert into another table

i need a way to do select last 2 month of data everytime

table is partitioned on year month day

On Sun, Feb 11, 2018 at 4:30 PM, Richard Qiao 
wrote:

> Would you mind share your code with us to analyze?
>
> > On Feb 10, 2018, at 10:18 AM, amit kumar singh 
> wrote:
> >
> > Hi Team,
> >
> > We have hive external  table which has 50 tb of data partitioned on year
> month day
> >
> > i want to move last 2 month of data into another table
> >
> > when i try to do this through spark ,more than 120k task are getting
> created
> >
> > what is the best way to do this
> >
> > thanks
> > Rohit
>
>


Re: Schema - DataTypes.NullType

2018-02-12 Thread Jean Georges Perrin
Thanks Nicholas. It makes sense. Now that I have a hint, I can play with it too!

jg

> On Feb 11, 2018, at 19:15, Nicholas Hakobian 
>  wrote:
> 
> I spent a few minutes poking around in the source code and found this:
> 
> The data type representing None, used for the types that cannot be inferred.
> 
> https://github.com/apache/spark/blob/branch-2.1/python/pyspark/sql/types.py#L107-L113
>  
> 
> 
> Playing around a bit, this is the only use case that I could immediately come 
> up with; you have some type of a placeholder field already in data, but its 
> always null. If you let createDataFrame (and I bet other things like 
> DataFrameReader would behave similarly) try to infer it directly, it will 
> error out since it can't infer the schema automatically. Doing something like 
> below will allow the data to be used. And, if memory serves, Hive has a 
> concept of a Null data type also for these types of situations.
> 
> In [9]: df = spark.createDataFrame([Row(id=1, val=None), Row(id=2, 
> val=None)], schema=StructType([StructField('id', LongType()), 
> StructField('val', NullType())]))
> 
> In [10]: df.show()
> +---++
> | id| val|
> +---++
> |  1|null|
> |  2|null|
> +---++
> 
> 
> In [11]: df.printSchema()
> root
>  |-- id: long (nullable = true)
>  |-- val: null (nullable = true)
> 
> 
> Nicholas Szandor Hakobian, Ph.D.
> Staff Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com 
> 
> 
> On Sun, Feb 11, 2018 at 5:40 AM, Jean Georges Perrin  > wrote:
> What is the purpose of DataTypes.NullType, specially as you are building a 
> schema? Have anyone used it or seen it as spart of a schema auto-generation?
> 
> 
> (If I keep asking long enough, I may get an answer, no? :) )
> 
> 
> > On Feb 4, 2018, at 13:15, Jean Georges Perrin  > > wrote:
> >
> > Any taker on this one? ;)
> >
> >> On Jan 29, 2018, at 16:05, Jean Georges Perrin  >> > wrote:
> >>
> >> Hi Sparkians,
> >>
> >> Can someone tell me what is the purpose of DataTypes.NullType, specially 
> >> as you are building a schema?
> >>
> >> Thanks
> >>
> >> jg
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> >> 
> >>
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> > 
> >
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread Georg Heiler
See
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
and
https://stackoverflow.com/questions/42448564/spark-sql-window-function-with-complex-condition
for a more involved example


KhajaAsmath Mohammed  schrieb am Mo. 12. Feb. 2018
um 15:16:

> I am also looking for the same answer. Will this work in streaming
> application too ??
>
> Sent from my iPhone
>
> On Feb 12, 2018, at 8:12 AM, Debabrata Ghosh 
> wrote:
>
> Georg - Thanks ! Will you be able to help me with a few examples please.
>
> Thanks in advance again !
>
> Cheers,
> D
>
> On Mon, Feb 12, 2018 at 6:03 PM, Georg Heiler 
> wrote:
>
>> You should look into window functions for spark sql.
>> Debabrata Ghosh  schrieb am Mo. 12. Feb. 2018 um
>> 13:10:
>>
>>> Hi,
>>>  Greetings !
>>>
>>>  I needed some efficient way in pyspark to execute a
>>> comparison (on all the attributes) between the current row and the previous
>>> row. My intent here is to leverage the distributed framework of Spark to
>>> the best extent so that can achieve a good speed. Please can anyone suggest
>>> me a suitable algorithm / command. Here is a snapshot of the underlying
>>> data which I need to compare:
>>>
>>> [image: Inline image 1]
>>>
>>> Thanks in advance !
>>>
>>> D
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread KhajaAsmath Mohammed
I am also looking for the same answer. Will this work in streaming application 
too ?? 

Sent from my iPhone

> On Feb 12, 2018, at 8:12 AM, Debabrata Ghosh  wrote:
> 
> Georg - Thanks ! Will you be able to help me with a few examples please.
> 
> Thanks in advance again !
> 
> Cheers,
> D
> 
>> On Mon, Feb 12, 2018 at 6:03 PM, Georg Heiler  
>> wrote:
>> You should look into window functions for spark sql. 
>> Debabrata Ghosh  schrieb am Mo. 12. Feb. 2018 um 
>> 13:10:
>>> Hi,
>>>  Greetings !
>>> 
>>>  I needed some efficient way in pyspark to execute a 
>>> comparison (on all the attributes) between the current row and the previous 
>>> row. My intent here is to leverage the distributed framework of Spark to 
>>> the best extent so that can achieve a good speed. Please can anyone suggest 
>>> me a suitable algorithm / command. Here is a snapshot of the underlying 
>>> data which I need to compare:
>>> 
>>> 
>>> 
>>> Thanks in advance !
>>> 
>>> D
> 


Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread Debabrata Ghosh
Georg - Thanks ! Will you be able to help me with a few examples please.

Thanks in advance again !

Cheers,
D

On Mon, Feb 12, 2018 at 6:03 PM, Georg Heiler 
wrote:

> You should look into window functions for spark sql.
> Debabrata Ghosh  schrieb am Mo. 12. Feb. 2018 um
> 13:10:
>
>> Hi,
>>  Greetings !
>>
>>  I needed some efficient way in pyspark to execute a
>> comparison (on all the attributes) between the current row and the previous
>> row. My intent here is to leverage the distributed framework of Spark to
>> the best extent so that can achieve a good speed. Please can anyone suggest
>> me a suitable algorithm / command. Here is a snapshot of the underlying
>> data which I need to compare:
>>
>> [image: Inline image 1]
>>
>> Thanks in advance !
>>
>> D
>>
>


Spark sortByKey is not lazy evaluated

2018-02-12 Thread sandudi
Hey people, sortByKey is not lazy evaluated in spark because of it will use
rang partition but i have one confusion it will load whole data from source
or only sample data. so if i use sortByKey it will get whole data in memory
or only sample so second time when i run real action it will load the whole
data in memory



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread Georg Heiler
You should look into window functions for spark sql.
Debabrata Ghosh  schrieb am Mo. 12. Feb. 2018 um
13:10:

> Hi,
>  Greetings !
>
>  I needed some efficient way in pyspark to execute a
> comparison (on all the attributes) between the current row and the previous
> row. My intent here is to leverage the distributed framework of Spark to
> the best extent so that can achieve a good speed. Please can anyone suggest
> me a suitable algorithm / command. Here is a snapshot of the underlying
> data which I need to compare:
>
> [image: Inline image 1]
>
> Thanks in advance !
>
> D
>


Efficient way to compare the current row with previous row contents

2018-02-12 Thread Debabrata Ghosh
Hi,
 Greetings !

 I needed some efficient way in pyspark to execute a
comparison (on all the attributes) between the current row and the previous
row. My intent here is to leverage the distributed framework of Spark to
the best extent so that can achieve a good speed. Please can anyone suggest
me a suitable algorithm / command. Here is a snapshot of the underlying
data which I need to compare:

[image: Inline image 1]

Thanks in advance !

D


[pyspark] structured streaming deployment & monitoring recommendation

2018-02-12 Thread Bram
Hi,

I have questions regarding spark structured streaming deployment
recommendation

I have +- 100 kafka topics that can be processed using similar code block.
I am using pyspark 2.2.1

Here is the idea:


TOPIC_LIST = ["topic_a","topic_b"."topic_c"]

stream = {}
for t in TOPIC_LIST:
stream[t] = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
"kafka-broker1:9092,kafka-broker2:9092") \
.option("subscribe", "{0}".format(t)) \
.load().writeStream \
.trigger(processingTime="10 seconds") \
.format("console") \
.start()


So far, I have these options:
1. do a single spark submit for all topics
2. do multiple spark submit per topics

I am thinking of deploying this using supervisor or upstart so when spark
app is down, I can get notified.

By submitting single spark app I can have more control on server resources
but a little bit hard to monitor if one app is having problem. For multiple
spark submits I am afraid I need to provide +- 100 cpu cores which I doubt
will be fully utilized as some of the topics do not have high througput,
however by creating +- 100 bash scripts maintained using supervisor I can
get alert for each of them.

For monitoring I am considering using this ->
https://argus-sec.com/monitoring-spark-prometheus/

Any recommendation or advices guys?

Thanks

Regards,

Abraham