Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
That makes a lot of sense now! I am looking for Tumbling window so my
window interval and batch interval is 24 hours. Every day I want to start
with a fresh state.
Finally, Since you said I need to do book keeping of "last 24 hours" ? Do
you mean I need to do this some external store and then compute Max? unless
I store history in some external store I am not seeing a way to retrieve
all history especially when GroupState.get() seems to return only the most
recent updated state but not the entire history.


On Wed, Aug 30, 2017 at 4:09 PM, Tathagata Das 
wrote:

> The per-key state S is kept in the memory. It has to be of a type that can
> be encoded by Datasets. All you have to do is update S every time the
> function is called, and the engine takes care of serializing/checkpointing
> the state value, and retrieving the correct version of the value when
> restarting from failures. So you explicitly don't have to "store" the state
> anywhere, the engine takes care of it under the hood. Internally, there is
> an interface called StateStore
> ,
> which defines a component who is actually responsible for checkpointing the
> values, etc. And there is a single implementation
> 
> of the store that keeps the values in a hashmap and writes all changes to
> the values to a HDFS-API-compatible fault-tolerant filesystem for
> checkpointing. With this, by default, you really don't have to worry about
> externalizing it and you don't have overload any thing in GroupState. You
> just use it as the example shows.
>
> It's important to note that all the state of all the keys is distributed
> over the executors. So each executor will have in its memory, a fraction of
> the all the train state. Depending on the number of trains, and the amount
> of data in the state, you will have to size the cluster and the workers
> accordingly. If you keep a lot of state for each train, then your overall
> memory requirements are going to increase. So you have to be judicious
> about how much data to keep as state data for each key.
>
> Regarding aggregation vs mapGroupsWithState, it's a trade-off between
> efficiency and flexibility. With aggregation, you can do sliding window of
> "24 hours" sliding every "1 hour", which will give max in "last 24 hours"
> updated every "1 hour". If you are okay with this approximation, then this
> is easiest to implement (don't forget setting watermarks) and most
> efficient. If you really want something more precise than that, then
> mapGroupsWithState is the ultimate flexible tool. However, you have to do
> bookkeeping of "last 24 hours" and calculate the max yourself. :)
>
> Hope this helps.
>
> On Wed, Aug 30, 2017 at 10:58 AM, kant kodali  wrote:
>
>> I think I understand *groupByKey/**mapGroupsWithState *and I am still
>> trying to wrap my head around *GroupState*. so, I believe I have a
>> naive questions to ask on *GroupState*.
>>
>> If I were to represent a state that has history of events (say 24 hours)
>> and say the number of events can be big for a given 24 hour period. where
>> do I store the state S? An external store like Kafka or a Database or a
>> Distributed File system ? I wonder if I can represent the state S using a
>> DataSet that represents the history of events? GroupState also has
>> .exists() and  .get() and if I am not wrong I should override these methods
>> right so comparisons and retrieval from external store can work?
>>
>> Thanks!
>>
>>
>>
>> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> Thanks for the explanation and for the clear pseudo code and an example!
>>>
>>> mapGroupsWithState is cool and looks very flexible however I have few
>>> concerns and questions. For example
>>>
>>> Say I store TrainHistory as max heap from the Java Collections library
>>> and I keep adding to to this heap for 24 hours and at some point I will run
>>> out of Java heap space right? Do I need to store TrainHistory as a
>>> DataSet or DataFrame instead of in memory max heap object from Java
>>> Collections library?
>>>
>>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>>> which approach is more efficient to solve this particular problem ?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Aah, I might have misinterpreted. The groupBy + window solution would
 give the max time for each train over 24 hours (non-overlapping window) of
 event data (timestamped by activity_timestamp). So the output would be
 like.

 Train Dest   Window(activity_timestamp)max(Time)
 1 

Re: Time window on Processing Time

2017-08-30 Thread madhu phatak
Hi,
That's great. Thanks a lot.

On Wed, Aug 30, 2017 at 10:44 AM, Tathagata Das  wrote:

> Yes, it can be! There is a sql function called current_timestamp() which
> is self-explanatory. So I believe you should be able to do something like
>
> import org.apache.spark.sql.functions._
>
> ds.withColumn("processingTime", current_timestamp())
>   .groupBy(window("processingTime", "1 minute"))
>   .count()
>
>
> On Mon, Aug 28, 2017 at 5:46 AM, madhu phatak 
> wrote:
>
>> Hi,
>> As I am playing with structured streaming, I observed that window
>> function always requires a time column in input data.So that means it's
>> event time.
>>
>> Is it possible to old spark streaming style window function based on
>> processing time. I don't see any documentation on the same.
>>
>> --
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>
>


-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: Do we always need to go through spark-submit?

2017-08-30 Thread vaquar khan
Hi Kant,

Ans :Yes

The org.apache.spark.launcher

package
provides classes for launching Spark jobs as child processes using a simple
Java API.
*Doc:*  https://spark.apache.org/docs/latest/rdd-programming-guide.html


*Library for launching Spark applications.*

This library allows applications to launch Spark programmatically. There's
only one entry point to the library - the SparkLauncher

 class.

The SparkLauncher.startApplication(
org.apache.spark.launcher.SparkAppHandle.Listener...)

can
be used to start Spark and provide a handle to monitor and control the
running application:


   import org.apache.spark.launcher.SparkAppHandle;
   import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
 public static void main(String[] args) throws Exception {
   SparkAppHandle handle = new SparkLauncher()
 .setAppResource("/my/app.jar")
 .setMainClass("my.spark.app.Main")
 .setMaster("local")
 .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
 .startApplication();
   // Use handle API to monitor / control application.
 }
   }


It's also possible to launch a raw child process, using the
SparkLauncher.launch()

 method:



   import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
 public static void main(String[] args) throws Exception {
   Process spark = new SparkLauncher()
 .setAppResource("/my/app.jar")
 .setMainClass("my.spark.app.Main")
 .setMaster("local")
 .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
 .launch();
   spark.waitFor();
 }
   }


*Note :*

a user application is launched using the bin/spark-submit script. This
script takes care of setting up the classpath with Spark and its
dependencies, and can support different cluster managers and deploy modes
that Spark supports:

Regards,
Vaquar khan

On Wed, Aug 30, 2017 at 3:58 PM, Irving Duran 
wrote:

> I don't know how this would work, but maybe your .jar calls spark-submit
> from within your jar if you were to compile the jar with the spark-submit
> class.
>
>
> Thank You,
>
> Irving Duran
>
> On Wed, Aug 30, 2017 at 10:57 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> I understand spark-submit sets up its own class loader and other things
>> but I am wondering if it  is possible to just compile the code and run it
>> using "java -jar mysparkapp.jar" ?
>>
>> Thanks,
>> kant
>>
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


Re: [Spark] Can Apache Spark be used with time series processing?

2017-08-30 Thread vaquar khan
Hi Alex,

Hope following links help you to understand why Spark is good for your
usecase.



   - https://www.youtube.com/watch?v=tKkneWcAIqU=youtu.be
   -
   
https://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/
   - http://ampcamp.berkeley.edu/6/exercises/time-series-tutorial-taxis.html


Regards,
Vaquar khan

On Wed, Aug 30, 2017 at 1:21 PM, Irving Duran 
wrote:

> I think it will work.  Might want to explore spark streams.
>
>
> Thank You,
>
> Irving Duran
>
> On Wed, Aug 30, 2017 at 10:50 AM,  wrote:
>
>> I don't see why not
>>
>> Sent from my iPhone
>>
>> > On Aug 24, 2017, at 1:52 PM, Alexandr Porunov <
>> alexandr.poru...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > I am new in Apache Spark. I need to process different time series data
>> (numeric values which depend on time) and react on next actions:
>> > 1. Data is changing up or down too fast.
>> > 2. Data is changing constantly up or down too long.
>> >
>> > For example, if the data have changed 30% up or down in the last five
>> minutes (or less), then I need to send a special event.
>> > If the data have changed 50% up or down in two hours (or less), then I
>> need to send a special event.
>> >
>> > Frequency of data changing is about 1000-3000 per second. And I need to
>> react as soon as possible.
>> >
>> > Does Apache Spark fit well for this scenario or I need to search for
>> another solution?
>> > Sorry for stupid question, but I am a total newbie.
>> >
>> > Regards
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


Re: Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread Tathagata Das
Why not set the watermark to be looser, one that works across all
partitions? The main usage of watermark is to drop state. If you loosen the
watermark threshold (e.g. from 1 hour to 10 hours), then you will keep more
state with older data, but you are guaranteed that you will not drop
important data.

On Wed, Aug 30, 2017 at 7:41 AM, KevinZwx  wrote:

> Hi,
>
> I'm working with Structured Streaming to process logs from kafka and use
> watermark to handle late events. Currently the watermark is computed by
> (max
> event time seen by the engine - late threshold), and the same watermark is
> used for all partitions.
>
> But in production environment it happens frequently that different
> partition
> is consumed at different speed, the consumption of some partitions may be
> left behind, so the newest event time in these partitions may be much
> smaller than than the others'. In this case using the same watermark for
> all
> partitions may cause heavy data loss.
>
> So is there any way to achieve different watermark for different kafka
> partition or any plan to work on this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread Tathagata Das
The per-key state S is kept in the memory. It has to be of a type that can
be encoded by Datasets. All you have to do is update S every time the
function is called, and the engine takes care of serializing/checkpointing
the state value, and retrieving the correct version of the value when
restarting from failures. So you explicitly don't have to "store" the state
anywhere, the engine takes care of it under the hood. Internally, there is
an interface called StateStore
,
which defines a component who is actually responsible for checkpointing the
values, etc. And there is a single implementation

of the store that keeps the values in a hashmap and writes all changes to
the values to a HDFS-API-compatible fault-tolerant filesystem for
checkpointing. With this, by default, you really don't have to worry about
externalizing it and you don't have overload any thing in GroupState. You
just use it as the example shows.

It's important to note that all the state of all the keys is distributed
over the executors. So each executor will have in its memory, a fraction of
the all the train state. Depending on the number of trains, and the amount
of data in the state, you will have to size the cluster and the workers
accordingly. If you keep a lot of state for each train, then your overall
memory requirements are going to increase. So you have to be judicious
about how much data to keep as state data for each key.

Regarding aggregation vs mapGroupsWithState, it's a trade-off between
efficiency and flexibility. With aggregation, you can do sliding window of
"24 hours" sliding every "1 hour", which will give max in "last 24 hours"
updated every "1 hour". If you are okay with this approximation, then this
is easiest to implement (don't forget setting watermarks) and most
efficient. If you really want something more precise than that, then
mapGroupsWithState is the ultimate flexible tool. However, you have to do
bookkeeping of "last 24 hours" and calculate the max yourself. :)

Hope this helps.

On Wed, Aug 30, 2017 at 10:58 AM, kant kodali  wrote:

> I think I understand *groupByKey/**mapGroupsWithState *and I am still
> trying to wrap my head around *GroupState*. so, I believe I have a
> naive questions to ask on *GroupState*.
>
> If I were to represent a state that has history of events (say 24 hours)
> and say the number of events can be big for a given 24 hour period. where
> do I store the state S? An external store like Kafka or a Database or a
> Distributed File system ? I wonder if I can represent the state S using a
> DataSet that represents the history of events? GroupState also has
> .exists() and  .get() and if I am not wrong I should override these methods
> right so comparisons and retrieval from external store can work?
>
> Thanks!
>
>
>
> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali  wrote:
>
>> Hi TD,
>>
>> Thanks for the explanation and for the clear pseudo code and an example!
>>
>> mapGroupsWithState is cool and looks very flexible however I have few
>> concerns and questions. For example
>>
>> Say I store TrainHistory as max heap from the Java Collections library
>> and I keep adding to to this heap for 24 hours and at some point I will run
>> out of Java heap space right? Do I need to store TrainHistory as a
>> DataSet or DataFrame instead of in memory max heap object from Java
>> Collections library?
>>
>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>> which approach is more efficient to solve this particular problem ?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Aah, I might have misinterpreted. The groupBy + window solution would
>>> give the max time for each train over 24 hours (non-overlapping window) of
>>> event data (timestamped by activity_timestamp). So the output would be
>>> like.
>>>
>>> Train Dest   Window(activity_timestamp)max(Time)
>>> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
>>> currently through aug29
>>> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating
>>> as no new updates coming in with activity_timestamp in this range.
>>>
>>> The drawback of this approach is that as soon as Aug28 starts, you have
>>> wait for new event about a train to get a new max(time). You may rather
>>> want a rolling 24 hour period, that is, the max time known over events in
>>> the last 24 hours.
>>> Then maintaining our own custom state using mapGroupsWithState/
>>> flatMapGroupsWithState() is the best and most flexible option.
>>> It is available in Spark 2.2 in Scala, Java.
>>>
>>> Here is an example that tracks 

Re: Do we always need to go through spark-submit?

2017-08-30 Thread Irving Duran
I don't know how this would work, but maybe your .jar calls spark-submit
from within your jar if you were to compile the jar with the spark-submit
class.


Thank You,

Irving Duran

On Wed, Aug 30, 2017 at 10:57 AM, kant kodali  wrote:

> Hi All,
>
> I understand spark-submit sets up its own class loader and other things
> but I am wondering if it  is possible to just compile the code and run it
> using "java -jar mysparkapp.jar" ?
>
> Thanks,
> kant
>


Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Brian Wylie
Tathagata,

Thanks, your explanation was great.

The suggestion worked well with the only minutia is that I needed to have
the TS field brought in as a DoubleType() or the time got truncated.

Thanks again,
-Brian








On Wed, Aug 30, 2017 at 1:34 PM, Tathagata Das 
wrote:

> 1. Generally, adding columns, etc. will not affect performance because the
> Spark's optimizer will automatically figure out columns that are not needed
> and eliminate in the optimization step. So that should never be a concern.
> 2. Again, this is generally not a concern as the optimizer will take care
> of moving such expressions around
> 3. However, using Python UDF is bd for perf. In your case, if the
> problem is that the timestamp is in float, you can cast to the float to
> timestamp type, and it should automatically convert it correctly.
> Something like this *col("ts").cast("timestamp")*
>
> On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie 
> wrote:
>
>> Hi All,
>>
>> I'm using structured streaming in Spark 2.2.
>>
>> I'm using PySpark and I have data (from a Kafka publisher) where the
>> timestamp is a float that looks like this:  1379288667.631940
>>
>> So here's my code (which is working fine)
>>
>> # SUBSCRIBE: Setup connection to Kafka Stream
>> raw_data = spark.readStream.format('kafka') \
>>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>>   .option('subscribe', 'dns') \
>>   .option('startingOffsets', 'latest') \
>>   .load()
>>
>> # ETL: Hardcoded Schema for DNS records (do this better later)
>> from pyspark.sql.types import StructType, StringType, BooleanType,
>> IntegerType, FloatType
>> from pyspark.sql.functions import from_json, to_json, col, struct
>>
>> dns_schema = StructType() \
>> .add('ts', FloatType()) \
>> .add('uid', StringType()) \
>> .add('id.orig_h', StringType()) \
>>   
>>
>> # ETL: Convert raw data into parsed and proper typed data
>> from pyspark.sql.functions import col, length, to_timestamp
>>
>> parsed_data = raw_data \
>>   .select(from_json(col("value").cast("string"),
>> dns_schema).alias('data')) \
>>   .select('data.*')
>>
>> # Convert Bro IDS time to an actual TimeStamp type
>> from pyspark.sql.functions import udf
>> import datetime
>> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
>> TimestampType())
>> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>>
>> # Then a writestream later...
>>
>> Okay so all this code works fine (the 'dt' field has exactly what I
>> want)... but I'll be streaming in a lot of data so here's the questions:
>>
>> - Will the creation of a new dataframe withColumn basically kill
>> performance?
>> - Should I move my UDF into the parsed_data.select(...)  part?
>> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
>> without luck)
>>
>> Any suggestions/pointers are greatly appreciated.
>>
>> -Brian Wylie
>>
>>
>>
>


Re: Python UDF to convert timestamps (performance question)

2017-08-30 Thread Tathagata Das
1. Generally, adding columns, etc. will not affect performance because the
Spark's optimizer will automatically figure out columns that are not needed
and eliminate in the optimization step. So that should never be a concern.
2. Again, this is generally not a concern as the optimizer will take care
of moving such expressions around
3. However, using Python UDF is bd for perf. In your case, if the
problem is that the timestamp is in float, you can cast to the float to
timestamp type, and it should automatically convert it correctly.
Something like this *col("ts").cast("timestamp")*

On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie 
wrote:

> Hi All,
>
> I'm using structured streaming in Spark 2.2.
>
> I'm using PySpark and I have data (from a Kafka publisher) where the
> timestamp is a float that looks like this:  1379288667.631940
>
> So here's my code (which is working fine)
>
> # SUBSCRIBE: Setup connection to Kafka Stream
> raw_data = spark.readStream.format('kafka') \
>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>   .option('subscribe', 'dns') \
>   .option('startingOffsets', 'latest') \
>   .load()
>
> # ETL: Hardcoded Schema for DNS records (do this better later)
> from pyspark.sql.types import StructType, StringType, BooleanType,
> IntegerType, FloatType
> from pyspark.sql.functions import from_json, to_json, col, struct
>
> dns_schema = StructType() \
> .add('ts', FloatType()) \
> .add('uid', StringType()) \
> .add('id.orig_h', StringType()) \
>   
>
> # ETL: Convert raw data into parsed and proper typed data
> from pyspark.sql.functions import col, length, to_timestamp
>
> parsed_data = raw_data \
>   .select(from_json(col("value").cast("string"),
> dns_schema).alias('data')) \
>   .select('data.*')
>
> # Convert Bro IDS time to an actual TimeStamp type
> from pyspark.sql.functions import udf
> import datetime
> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
> TimestampType())
> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>
> # Then a writestream later...
>
> Okay so all this code works fine (the 'dt' field has exactly what I
> want)... but I'll be streaming in a lot of data so here's the questions:
>
> - Will the creation of a new dataframe withColumn basically kill
> performance?
> - Should I move my UDF into the parsed_data.select(...)  part?
> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
> without luck)
>
> Any suggestions/pointers are greatly appreciated.
>
> -Brian Wylie
>
>
>


Design aspects of Data partitioning for Window functions

2017-08-30 Thread Vasu Gourabathina
All:

If this question was already discussed, please let me know. I can try to
look into the archive.

Data Characteristics:
entity_id  date  fact_1 fact_2  fact_N   derived_1  derived_2  derived_X

a) There are 1000s of such entities in the system
b) Each one has various Fact attributes per each day (to begin with). In
future, we wanted to support multiple entries per day
c) Goal is to calculate various Derived attributes...some of them are
Windows functions, such as Average, Moving Average etc
d) The total number of rows per each entity might not be equally
distributed

Question:
1) What's the best way to partition the data for better performance
optimization? Any things to consider given point #d above?

Sample code:
The following code seems to work fine on a smaller sample size:
  window =
Window.partitionBy('entity_id').orderBy('date').rowsBetween(-30, 0)
  moving_avg = mean(df['fact_1']).over(window)
  moving_avg
  df2 = df.withColumn('derived_moving_avg', moving_avg)

Please advise if there are any aspects that need to be considered to make
it efficient to run on a larger data size (with N-node spark cluster).

Thanks in advance,
Vasu.


Python UDF to convert timestamps (performance question)

2017-08-30 Thread Brian Wylie
Hi All,

I'm using structured streaming in Spark 2.2.

I'm using PySpark and I have data (from a Kafka publisher) where the
timestamp is a float that looks like this:  1379288667.631940

So here's my code (which is working fine)

# SUBSCRIBE: Setup connection to Kafka Stream
raw_data = spark.readStream.format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'dns') \
  .option('startingOffsets', 'latest') \
  .load()

# ETL: Hardcoded Schema for DNS records (do this better later)
from pyspark.sql.types import StructType, StringType, BooleanType,
IntegerType, FloatType
from pyspark.sql.functions import from_json, to_json, col, struct

dns_schema = StructType() \
.add('ts', FloatType()) \
.add('uid', StringType()) \
.add('id.orig_h', StringType()) \
  

# ETL: Convert raw data into parsed and proper typed data
from pyspark.sql.functions import col, length, to_timestamp

parsed_data = raw_data \
  .select(from_json(col("value").cast("string"), dns_schema).alias('data'))
\
  .select('data.*')

# Convert Bro IDS time to an actual TimeStamp type
from pyspark.sql.functions import udf
import datetime
my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
TimestampType())
parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))

# Then a writestream later...

Okay so all this code works fine (the 'dt' field has exactly what I
want)... but I'll be streaming in a lot of data so here's the questions:

- Will the creation of a new dataframe withColumn basically kill
performance?
- Should I move my UDF into the parsed_data.select(...)  part?
- Can my UDF be done by spark.sql directly? (I tried to_timestamp but
without luck)

Any suggestions/pointers are greatly appreciated.

-Brian Wylie


Re: [Spark] Can Apache Spark be used with time series processing?

2017-08-30 Thread Irving Duran
I think it will work.  Might want to explore spark streams.


Thank You,

Irving Duran

On Wed, Aug 30, 2017 at 10:50 AM,  wrote:

> I don't see why not
>
> Sent from my iPhone
>
> > On Aug 24, 2017, at 1:52 PM, Alexandr Porunov <
> alexandr.poru...@gmail.com> wrote:
> >
> > Hello,
> >
> > I am new in Apache Spark. I need to process different time series data
> (numeric values which depend on time) and react on next actions:
> > 1. Data is changing up or down too fast.
> > 2. Data is changing constantly up or down too long.
> >
> > For example, if the data have changed 30% up or down in the last five
> minutes (or less), then I need to send a special event.
> > If the data have changed 50% up or down in two hours (or less), then I
> need to send a special event.
> >
> > Frequency of data changing is about 1000-3000 per second. And I need to
> react as soon as possible.
> >
> > Does Apache Spark fit well for this scenario or I need to search for
> another solution?
> > Sorry for stupid question, but I am a total newbie.
> >
> > Regards
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
I think I understand *groupByKey/**mapGroupsWithState *and I am still
trying to wrap my head around *GroupState*. so, I believe I have a naive
questions to ask on *GroupState*.

If I were to represent a state that has history of events (say 24 hours)
and say the number of events can be big for a given 24 hour period. where
do I store the state S? An external store like Kafka or a Database or a
Distributed File system ? I wonder if I can represent the state S using a
DataSet that represents the history of events? GroupState also has
.exists() and  .get() and if I am not wrong I should override these methods
right so comparisons and retrieval from external store can work?

Thanks!



On Wed, Aug 30, 2017 at 1:39 AM, kant kodali  wrote:

> Hi TD,
>
> Thanks for the explanation and for the clear pseudo code and an example!
>
> mapGroupsWithState is cool and looks very flexible however I have few
> concerns and questions. For example
>
> Say I store TrainHistory as max heap from the Java Collections library and
> I keep adding to to this heap for 24 hours and at some point I will run out
> of Java heap space right? Do I need to store TrainHistory as a DataSet or
> DataFrame instead of in memory max heap object from Java Collections
> library?
>
> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
> which approach is more efficient to solve this particular problem ?
>
> Thanks!
>
>
>
>
>
> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Aah, I might have misinterpreted. The groupBy + window solution would
>> give the max time for each train over 24 hours (non-overlapping window) of
>> event data (timestamped by activity_timestamp). So the output would be
>> like.
>>
>> Train Dest   Window(activity_timestamp)max(Time)
>> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
>> currently through aug29
>> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating
>> as no new updates coming in with activity_timestamp in this range.
>>
>> The drawback of this approach is that as soon as Aug28 starts, you have
>> wait for new event about a train to get a new max(time). You may rather
>> want a rolling 24 hour period, that is, the max time known over events in
>> the last 24 hours.
>> Then maintaining our own custom state using mapGroupsWithState/
>> flatMapGroupsWithState() is the best and most flexible option.
>> It is available in Spark 2.2 in Scala, Java.
>>
>> Here is an example that tracks sessions based on events.
>> Scala - https://github.com/apache/spark/blob/master/examples/src/
>> main/scala/org/apache/spark/examples/sql/streaming/Structu
>> redSessionization.scala
>>
>> You will have to create a custom per-train state which keeps track of
>> last 24 hours of trains history, and use that state to calculate the max
>> time for each train.
>>
>>
>> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
>> state: GroupState[TrainHistory]): Long = {
>> // for every event, update history (i.e. last 24 hours of events) and
>> return the max time from the history
>> }
>>
>> trainTimesDataset // Dataset[TrainEvents]
>>   .groupByKey(_.train)
>>   .mapGroupsWithState(updateHistoryAndGetMax)
>>
>> Hope this helps.
>>
>>
>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>>
>>> Hey TD,
>>>
>>> If I understood the question correctly, your solution wouldn't return
>>> the exact solution, since it also groups by on destination. I would say the
>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>> .groupByKey(_.train)
>>>
>>> and keep in state the row with the maximum time.
>>>
>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Yes. And in that case, if you just care about only the last few days of
 max, then you should set watermark on the timestamp column.

  *trainTimesDataset*
 *  .withWatermark("**activity_timestamp", "5 days")*
 *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
 "train", "dest")*
 *  .max("time")*

 Any counts which are more than 5 days old will be dropped from the
 streaming state.

 On Tue, Aug 29, 2017 at 2:06 PM, kant kodali 
 wrote:

> Hi,
>
> Thanks for the response. Since this is a streaming based query and in
> my case I need to hold state for 24 hours which I forgot to mention in my
> previous email. can I do ?
>
>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
> "24 hours"), "train", "dest").max("time")*
>
>
> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>> Int, dest: String, time: Timestamp] *
>>
>>
>> *Scala*: *trainTimesDataset.groupBy("train", 

Re: [Structured Streaming]Data processing and output trigger should be decoupled

2017-08-30 Thread Shixiong(Ryan) Zhu
I don't think that's a good idea. If the engine keeps on processing data
but doesn't output anything, where to keep the intermediate data?

On Wed, Aug 30, 2017 at 9:26 AM, KevinZwx  wrote:

> Hi,
>
> I'm working with structured streaming, and I'm wondering whether there
> should be some improvements about trigger.
>
> Currently, when I specify a trigger, i.e. tigger(Trigger.ProcessingTime(
> "10
> minutes")), the engine will begin processing data at the time the trigger
> begins, like 10:00:00, 10:10:00, 10:20:00,..., etc, if the engine takes 10s
> to process this batch of data, then we will get the output result at
> 10:00:10...,  then the engine just waits without processing any data. When
> the next trigger begins, the engine begins to process the data during the
> interval, and if this time the engine takes 15s to process the batch, we
> will get result at 10:10:15. This is the problem.
>
> In my understanding, the trigger and data processing should be decoupled,
> the engine should keep on processing data as fast as possible, but only
> generate output results at each trigger, therefore we can get the result at
> 10:00:00, 10:10:00, 10:20:00, ... So I'm wondering if there is any solution
> or plan to work on this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Structured Streaming]Data processing and output trigger should be decoupled

2017-08-30 Thread KevinZwx
Hi,

I'm working with structured streaming, and I'm wondering whether there
should be some improvements about trigger.

Currently, when I specify a trigger, i.e. tigger(Trigger.ProcessingTime("10
minutes")), the engine will begin processing data at the time the trigger
begins, like 10:00:00, 10:10:00, 10:20:00,..., etc, if the engine takes 10s
to process this batch of data, then we will get the output result at
10:00:10...,  then the engine just waits without processing any data. When
the next trigger begins, the engine begins to process the data during the
interval, and if this time the engine takes 15s to process the batch, we
will get result at 10:10:15. This is the problem.

In my understanding, the trigger and data processing should be decoupled,
the engine should keep on processing data as fast as possible, but only
generate output results at each trigger, therefore we can get the result at
10:00:00, 10:10:00, 10:20:00, ... So I'm wondering if there is any solution
or plan to work on this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.2 structured streaming with mapGroupsWithState + window functions

2017-08-30 Thread kant kodali
+1

Is this ticket related https://issues.apache.org/jira/browse/SPARK-21641 ?

On Mon, Aug 28, 2017 at 7:06 AM, daniel williams 
wrote:

> Hi all,
>
> I've been looking heavily into Spark 2.2 to solve a problem I have by
> specifically using mapGroupsWithState.  What I've discovered is that a
> *groupBy(window(..))* does not work when being used with a subsequent
> *mapGroupsWithState* and produces an AnalysisException of :
>
> *"mapGroupsWithState is not supported with aggregation on a streaming
> DataFrame/Dataset;;"*
>
> I have http logs that have been rolled up via a previous jobs window
> function in the form of:
>
> {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
> "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10}
> {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
> "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89}
>
> In this data the *when* sub-object is of one minute blocks.  I'd lock to
> use a *window* function to aggregate that to 10 minute windows and sum
> the eventCount by grouping on account, verb, and statusCode.  From there
> I'd like to *mapGroupsWithState* for each *account* and *verb* to produce
> buckets for some configurable window, say 10 minutes for example's sake, of
> the form:
>
> {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"},
> "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198}
>
> *mapGroupsWithState* is perfect for this but, as stated, I've not found a
> way to apply a window function *and* use the mapsGroupsWithState.
>
> Example:
>
> ds.withColumn("bucket", $"when.from")
>
> .withWatermark("bucket", "1 minutes")
>
> .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller
> windowed events into a rolled up larger window event with summed eventCount
>
>   $"account",
>
>   $"verb",
>
>   $"statusCode")
>
> .agg(
>
>   sum($"eventCount")
>
> )
>
> .map(r => Log())
>
> .groupByKey(l => (l.when, l.account, l.verb)) -- maps
>
> .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will
> calculate totalErrors / totalRequests per bucket
>
>.EventTimeTimeout()) {
>
>case ((when: Window, account: String, verb: String),
>
>  events: Iterator[Log],
>
>  state: GroupState[SessionInfo]) => {
>
> ..
>
>   }
> }
>
>
> Any suggestions would be greatly appreciated.
>
> I've also noticed that *groupByKey().reduceGroups()* does not work with 
> *mapGroupsWithState
> *which is another strategy that I've tried.
>
> Thanks.
>
> dan
>


Do we always need to go through spark-submit?

2017-08-30 Thread kant kodali
Hi All,

I understand spark-submit sets up its own class loader and other things but
I am wondering if it  is possible to just compile the code and run it using
"java -jar mysparkapp.jar" ?

Thanks,
kant


Re: [Spark] Can Apache Spark be used with time series processing?

2017-08-30 Thread kanth909
I don't see why not

Sent from my iPhone

> On Aug 24, 2017, at 1:52 PM, Alexandr Porunov  
> wrote:
> 
> Hello,
> 
> I am new in Apache Spark. I need to process different time series data 
> (numeric values which depend on time) and react on next actions:
> 1. Data is changing up or down too fast.
> 2. Data is changing constantly up or down too long.
> 
> For example, if the data have changed 30% up or down in the last five minutes 
> (or less), then I need to send a special event.
> If the data have changed 50% up or down in two hours (or less), then I need 
> to send a special event.
> 
> Frequency of data changing is about 1000-3000 per second. And I need to react 
> as soon as possible. 
> 
> Does Apache Spark fit well for this scenario or I need to search for another 
> solution?
> Sorry for stupid question, but I am a total newbie.
> 
> Regards

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Sync commit to kafka 0.10

2017-08-30 Thread krot.vyacheslav
Hi,

I'm looking for a way to make a sync commit of offsets to kafka 0.10?
commitAsync works well, but I'd like to proceed to next job only after
successful commit, a small additional latency is not an issue for my
usecase. I know I can store offsets somewhere else, but builtin kafka offset
storage looks good and easy to use. What is the correct way to do this? 
I tried first approach that came up to my mind, like this:
val latch = new 
commitAsync(offsetRanges, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
latch.countDown();
}
});
latch.await();

but this does not work - I get very wierd effects - new data from kafka is
read with great delays.

Is there an elegant right way to do this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Updates on migration guides

2017-08-30 Thread Nick Pentreath
MLlib has tried quite hard to ensure the migration guide is up to date for
each release. I think generally we catch all breaking and most major
behavior changes

On Wed, 30 Aug 2017 at 17:02, Dongjoon Hyun  wrote:

> +1
>
> On Wed, Aug 30, 2017 at 7:54 AM, Xiao Li  wrote:
>
>> Hi, Devs,
>>
>> Many questions from the open source community are actually caused by the
>> behavior changes we made in each release. So far, the migration guides
>> (e.g.,
>> https://spark.apache.org/docs/latest/sql-programming-guide.html#migration-guide)
>> were not being properly updated. In the last few releases, multiple
>> behavior changes are not documented in migration guides and even release
>> notes. I propose to do the document updates in the same PRs that introduce
>> the behavior changes. If the contributors can't make it, the committers who
>> merge the PRs need to do it instead. We also can create a dedicated page
>> for migration guides of all the components. Hopefully, this can assist the
>> migration efforts.
>>
>> Thanks,
>>
>> Xiao Li
>>
>
>


Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread KevinZwx
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different partition
is consumed at different speed, the consumption of some partitions may be
left behind, so the newest event time in these partitions may be much
smaller than than the others'. In this case using the same watermark for all
partitions may cause heavy data loss. 

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread 张万新
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different
partition is consumed at different speed, the consumption of some
partitions may be left behind, so the newest event time in these partitions
may be much smaller than than the others'. In this case using the same
watermark for all partitions may cause heavy data loss.

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?


Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-30 Thread Andrés Ivaldi
I see, as @ayan said, it's valid, but, why don't use API or SQL, the
build-in options are optimized
I understand  that SQL API is hard when trying to build an api over that,
but Spark API doesn't, and you can do a lot with that.

regards,


On Wed, Aug 30, 2017 at 10:31 AM, ayan guha  wrote:

> Well, using raw sql is a valid option, but if you do not want you can
> always implement the concept using apis. All these constructs have api
> counterparts, such as filter, window, over, row number etc.
>
> On Wed, 30 Aug 2017 at 10:49 pm, purna pradeep 
> wrote:
>
>> @Andres I need latest but it should less than 10 months based income_age
>> column and don't want to use sql here
>>
>> On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi  wrote:
>>
>>> Hi, if you need the last value from income in window function you can
>>> use last_value.
>>> No tested but meaby with @ayan sql
>>>
>>> spark.sql("select *, row_number(), last_value(income) over (partition by
>>> id order by income_age_ts desc) r from t")
>>>
>>>
>>> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep >> > wrote:
>>>
 @ayan,

 Thanks for your response

 I would like to have functions in this case  calculateIncome and the
 reason why I need function is to reuse in other parts of the application
 ..that's the reason I'm planning for mapgroups with function as argument
 which takes rowiterator ..but not sure if this is the best to implement as
 my initial dataframe is very large

 On Tue, Aug 29, 2017 at 10:24 PM ayan guha  wrote:

> Hi
>
> the tool you are looking for is window function.  Example:
>
> >>> df.show()
> +++---+--+-+
> |JoinDate|dept| id|income|income_age_ts|
> +++---+--+-+
> | 4/20/13|  ES|101| 19000|  4/20/17|
> | 4/20/13|  OS|101| 1|  10/3/15|
> | 4/20/12|  DS|102| 13000|   5/9/17|
> | 4/20/12|  CS|102| 12000|   5/8/17|
> | 4/20/10|  EQ|103| 1|   5/9/17|
> | 4/20/10|  MD|103|  9000|   5/8/17|
> +++---+--+-+
>
> >>> res = spark.sql("select *, row_number() over (partition by id
> order by income_age_ts desc) r from t")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/13|  OS|101| 1|  10/3/15|  2|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
> +++---+--+-+---+
>
> >>> res = spark.sql("select * from (select *, row_number() over
> (partition by id order by income_age_ts desc) r from t) x where r=1")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> +++---+--+-+---+
>
> This should be better because it uses all in-built optimizations in
> Spark.
>
> Best
> Ayan
>
> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <
> purna2prad...@gmail.com> wrote:
>
>> Please click on unnamed text/html  link for better view
>>
>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <
>> purna2prad...@gmail.com> wrote:
>>
>>>
>>> -- Forwarded message -
>>> From: Mamillapalli, Purna Pradeep >> capitalone.com>
>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>> Subject: Spark question
>>> To: purna pradeep 
>>>
>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>
>>>
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 101
>>>
>>> 1
>>>
>>> 10/3/15
>>>
>>> 4/20/13
>>>
>>> OS
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 102
>>>
>>> 12000
>>>
>>> 5/8/17
>>>
>>> 4/20/12
>>>
>>> CS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>> 103
>>>
>>> 9000
>>>
>>> 5/8/15
>>>
>>> 4/20/10
>>>
>>> MD
>>>
>>> Get the 

RE: from_json()

2017-08-30 Thread JG Perrin
Hey Sam,

Nope – it does not work the way I want. I guess it is only working with one 
type…

Trying to convert:
{"releaseDate":147944880,"link":"http://amzn.to/2kup94P","id":1,"authorId":1,"title":"Fantastic
 Beasts and Where to Find Them: The Original Screenplay"}

I get:
[Executor task launch worker for task 3:ERROR] Logging$class: Exception in task 
0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: Failed to convert the JSON string 
'{"releaseDate":147944880,"link":"http://amzn.to/2kup94P","id":1,"authorId":1,"title":"Fantastic
 Beasts and Where to Find Them: The Original Screenplay"}' to a data type.
   at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:176)
   at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:108)
   at org.apache.spark.sql.types.DataType.fromJson(DataType.scala)
   at 
net.jgp.labs.spark.l250_map.l031_dataset_book_json_in_progress.CsvToDatasetBookAsJson$BookMapper.call(CsvToDatasetBookAsJson.java:44)
   at 
net.jgp.labs.spark.l250_map.l031_dataset_book_json_in_progress.CsvToDatasetBookAsJson$BookMapper.call(CsvToDatasetBookAsJson.java:1)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   at org.apache.spark.scheduler.Task.run(Task.scala:108)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
   at java.lang.Thread.run(Unknown Source)




From: JG Perrin [mailto:jper...@lumeris.com]
Sent: Monday, August 28, 2017 1:29 PM
To: Sam Elamin 
Cc: user@spark.apache.org
Subject: RE: from_json()

Thanks Sam – this might be the solution. I will investigate!

From: Sam Elamin [mailto:hussam.ela...@gmail.com]
Sent: Monday, August 28, 2017 1:14 PM
To: JG Perrin >
Cc: user@spark.apache.org
Subject: Re: from_json()

Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new schema 
from a df its fairly simple, assuming you have a schema already predefined or 
in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)
Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin 
> wrote:
Is there a way to not have to specify a schema when using from_json() or infer 
the schema? When you read a JSON doc from disk, you can infer the schema. 
Should I write it to disk before (ouch)?

jg


This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender. This 
information may contain confidential health information that is legally 
privileged. The information is intended only for the use of the individual or 
entity named above. The authorized recipient of this transmission is prohibited 
from disclosing this information to any other party unless required to do so by 
law or regulation and is required to delete or destroy the information after 
its stated need has been fulfilled. If you are not the intended recipient, you 
are hereby notified that any disclosure, copying, distribution or the taking of 
any action in reliance on or regarding the contents of this electronically 
transmitted information is strictly prohibited. If you have received this 
E-mail in error, please notify the sender and delete this message immediately.



Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-30 Thread ayan guha
Well, using raw sql is a valid option, but if you do not want you can
always implement the concept using apis. All these constructs have api
counterparts, such as filter, window, over, row number etc.

On Wed, 30 Aug 2017 at 10:49 pm, purna pradeep 
wrote:

> @Andres I need latest but it should less than 10 months based income_age
> column and don't want to use sql here
>
> On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi  wrote:
>
>> Hi, if you need the last value from income in window function you can use
>> last_value.
>> No tested but meaby with @ayan sql
>>
>> spark.sql("select *, row_number(), last_value(income) over (partition by
>> id order by income_age_ts desc) r from t")
>>
>>
>> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep 
>> wrote:
>>
>>> @ayan,
>>>
>>> Thanks for your response
>>>
>>> I would like to have functions in this case  calculateIncome and the
>>> reason why I need function is to reuse in other parts of the application
>>> ..that's the reason I'm planning for mapgroups with function as argument
>>> which takes rowiterator ..but not sure if this is the best to implement as
>>> my initial dataframe is very large
>>>
>>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha  wrote:
>>>
 Hi

 the tool you are looking for is window function.  Example:

 >>> df.show()
 +++---+--+-+
 |JoinDate|dept| id|income|income_age_ts|
 +++---+--+-+
 | 4/20/13|  ES|101| 19000|  4/20/17|
 | 4/20/13|  OS|101| 1|  10/3/15|
 | 4/20/12|  DS|102| 13000|   5/9/17|
 | 4/20/12|  CS|102| 12000|   5/8/17|
 | 4/20/10|  EQ|103| 1|   5/9/17|
 | 4/20/10|  MD|103|  9000|   5/8/17|
 +++---+--+-+

 >>> res = spark.sql("select *, row_number() over (partition by id order
 by income_age_ts desc) r from t")
 >>> res.show()
 +++---+--+-+---+
 |JoinDate|dept| id|income|income_age_ts|  r|
 +++---+--+-+---+
 | 4/20/10|  EQ|103| 1|   5/9/17|  1|
 | 4/20/10|  MD|103|  9000|   5/8/17|  2|
 | 4/20/13|  ES|101| 19000|  4/20/17|  1|
 | 4/20/13|  OS|101| 1|  10/3/15|  2|
 | 4/20/12|  DS|102| 13000|   5/9/17|  1|
 | 4/20/12|  CS|102| 12000|   5/8/17|  2|
 +++---+--+-+---+

 >>> res = spark.sql("select * from (select *, row_number() over
 (partition by id order by income_age_ts desc) r from t) x where r=1")
 >>> res.show()
 +++---+--+-+---+
 |JoinDate|dept| id|income|income_age_ts|  r|
 +++---+--+-+---+
 | 4/20/10|  EQ|103| 1|   5/9/17|  1|
 | 4/20/13|  ES|101| 19000|  4/20/17|  1|
 | 4/20/12|  DS|102| 13000|   5/9/17|  1|
 +++---+--+-+---+

 This should be better because it uses all in-built optimizations in
 Spark.

 Best
 Ayan

 On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <
 purna2prad...@gmail.com> wrote:

> Please click on unnamed text/html  link for better view
>
> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep 
> wrote:
>
>>
>> -- Forwarded message -
>> From: Mamillapalli, Purna Pradeep <
>> purnapradeep.mamillapa...@capitalone.com>
>> Date: Tue, Aug 29, 2017 at 8:08 PM
>> Subject: Spark question
>> To: purna pradeep 
>>
>> Below is the input Dataframe(In real this is a very large Dataframe)
>>
>>
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 101
>>
>> 1
>>
>> 10/3/15
>>
>> 4/20/13
>>
>> OS
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 102
>>
>> 12000
>>
>> 5/8/17
>>
>> 4/20/12
>>
>> CS
>>
>> 103
>>
>> 1
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>> 103
>>
>> 9000
>>
>> 5/8/15
>>
>> 4/20/10
>>
>> MD
>>
>> Get the latest income of an employee which has  Income_age ts <10
>> months
>>
>> Expected output Dataframe
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 103
>>
>> 1
>>
>> 5/9/17
>>

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-30 Thread purna pradeep
@Andres I need latest but it should less than 10 months based income_age
column and don't want to use sql here

On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi  wrote:

> Hi, if you need the last value from income in window function you can use
> last_value.
> No tested but meaby with @ayan sql
>
> spark.sql("select *, row_number(), last_value(income) over (partition by
> id order by income_age_ts desc) r from t")
>
>
> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep 
> wrote:
>
>> @ayan,
>>
>> Thanks for your response
>>
>> I would like to have functions in this case  calculateIncome and the
>> reason why I need function is to reuse in other parts of the application
>> ..that's the reason I'm planning for mapgroups with function as argument
>> which takes rowiterator ..but not sure if this is the best to implement as
>> my initial dataframe is very large
>>
>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha  wrote:
>>
>>> Hi
>>>
>>> the tool you are looking for is window function.  Example:
>>>
>>> >>> df.show()
>>> +++---+--+-+
>>> |JoinDate|dept| id|income|income_age_ts|
>>> +++---+--+-+
>>> | 4/20/13|  ES|101| 19000|  4/20/17|
>>> | 4/20/13|  OS|101| 1|  10/3/15|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|
>>> | 4/20/12|  CS|102| 12000|   5/8/17|
>>> | 4/20/10|  EQ|103| 1|   5/9/17|
>>> | 4/20/10|  MD|103|  9000|   5/8/17|
>>> +++---+--+-+
>>>
>>> >>> res = spark.sql("select *, row_number() over (partition by id order
>>> by income_age_ts desc) r from t")
>>> >>> res.show()
>>> +++---+--+-+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +++---+--+-+---+
>>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>>> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
>>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>>> | 4/20/13|  OS|101| 1|  10/3/15|  2|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>>> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
>>> +++---+--+-+---+
>>>
>>> >>> res = spark.sql("select * from (select *, row_number() over
>>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>>> >>> res.show()
>>> +++---+--+-+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +++---+--+-+---+
>>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>>> +++---+--+-+---+
>>>
>>> This should be better because it uses all in-built optimizations in
>>> Spark.
>>>
>>> Best
>>> Ayan
>>>
>>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep >> > wrote:
>>>
 Please click on unnamed text/html  link for better view

 On Tue, Aug 29, 2017 at 8:11 PM purna pradeep 
 wrote:

>
> -- Forwarded message -
> From: Mamillapalli, Purna Pradeep <
> purnapradeep.mamillapa...@capitalone.com>
> Date: Tue, Aug 29, 2017 at 8:08 PM
> Subject: Spark question
> To: purna pradeep 
>
> Below is the input Dataframe(In real this is a very large Dataframe)
>
>
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 101
>
> 1
>
> 10/3/15
>
> 4/20/13
>
> OS
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 102
>
> 12000
>
> 5/8/17
>
> 4/20/12
>
> CS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
> 103
>
> 9000
>
> 5/8/15
>
> 4/20/10
>
> MD
>
> Get the latest income of an employee which has  Income_age ts <10
> months
>
> Expected output Dataframe
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
>
>





 Below is what im planning to implement
>
>
>
> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE:
> Int, *JOINDATE*: Int,DEPT:String)
>
>
>
> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
> *"Date"*). add(*"DEPT"*,*"String"*)
>
>
>
> *//Reading from 

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-30 Thread Andrés Ivaldi
Hi, if you need the last value from income in window function you can use
last_value.
No tested but meaby with @ayan sql

spark.sql("select *, row_number(), last_value(income) over (partition by id
order by income_age_ts desc) r from t")


On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep 
wrote:

> @ayan,
>
> Thanks for your response
>
> I would like to have functions in this case  calculateIncome and the
> reason why I need function is to reuse in other parts of the application
> ..that's the reason I'm planning for mapgroups with function as argument
> which takes rowiterator ..but not sure if this is the best to implement as
> my initial dataframe is very large
>
> On Tue, Aug 29, 2017 at 10:24 PM ayan guha  wrote:
>
>> Hi
>>
>> the tool you are looking for is window function.  Example:
>>
>> >>> df.show()
>> +++---+--+-+
>> |JoinDate|dept| id|income|income_age_ts|
>> +++---+--+-+
>> | 4/20/13|  ES|101| 19000|  4/20/17|
>> | 4/20/13|  OS|101| 1|  10/3/15|
>> | 4/20/12|  DS|102| 13000|   5/9/17|
>> | 4/20/12|  CS|102| 12000|   5/8/17|
>> | 4/20/10|  EQ|103| 1|   5/9/17|
>> | 4/20/10|  MD|103|  9000|   5/8/17|
>> +++---+--+-+
>>
>> >>> res = spark.sql("select *, row_number() over (partition by id order
>> by income_age_ts desc) r from t")
>> >>> res.show()
>> +++---+--+-+---+
>> |JoinDate|dept| id|income|income_age_ts|  r|
>> +++---+--+-+---+
>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>> | 4/20/13|  OS|101| 1|  10/3/15|  2|
>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
>> +++---+--+-+---+
>>
>> >>> res = spark.sql("select * from (select *, row_number() over
>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>> >>> res.show()
>> +++---+--+-+---+
>> |JoinDate|dept| id|income|income_age_ts|  r|
>> +++---+--+-+---+
>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>> +++---+--+-+---+
>>
>> This should be better because it uses all in-built optimizations in Spark.
>>
>> Best
>> Ayan
>>
>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep 
>> wrote:
>>
>>> Please click on unnamed text/html  link for better view
>>>
>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep 
>>> wrote:
>>>

 -- Forwarded message -
 From: Mamillapalli, Purna Pradeep 
 Date: Tue, Aug 29, 2017 at 8:08 PM
 Subject: Spark question
 To: purna pradeep 

 Below is the input Dataframe(In real this is a very large Dataframe)



 EmployeeID

 INCOME

 INCOME AGE TS

 JoinDate

 Dept

 101

 19000

 4/20/17

 4/20/13

 ES

 101

 1

 10/3/15

 4/20/13

 OS

 102

 13000

 5/9/17

 4/20/12

 DS

 102

 12000

 5/8/17

 4/20/12

 CS

 103

 1

 5/9/17

 4/20/10

 EQ

 103

 9000

 5/8/15

 4/20/10

 MD

 Get the latest income of an employee which has  Income_age ts <10 months

 Expected output Dataframe

 EmployeeID

 INCOME

 INCOME AGE TS

 JoinDate

 Dept

 101

 19000

 4/20/17

 4/20/13

 ES

 102

 13000

 5/9/17

 4/20/12

 DS

 103

 1

 5/9/17

 4/20/10

 EQ



>>>
>>>
>>>
>>>
>>>
>>> Below is what im planning to implement



 case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
 *JOINDATE*: Int,DEPT:String)



 *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
 *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
 *"Date"*). add(*"DEPT"*,*"String"*)



 *//Reading from the File **import *sparkSession.implicits._

 *val *readEmpFile = sparkSession.read
   .option(*"sep"*, *","*)
   .schema(empSchema)
   .csv(INPUT_DIRECTORY)


 *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]


 *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
 EmployeeID*)


 *val *k = groupByDf.mapGroups((key,value) => 

[SS] StateStoreSaveExec in Complete output mode and metrics in stateOperators

2017-08-30 Thread Jacek Laskowski
Hi,

I've been reviewing how StateStoreSaveExec works per output mode
focusing on Complete output mode [1] at the moment.

My understanding is that while in Complete output mode
StateStoreSaveExec uses the metrics as follows:

* numRowsTotal is the number of all the state keys in the state store
* numRowsUpdated is the number of the state keys that were updated in
the state store (i.e. the keys were available earlier and appeared in
the result rows of the upstream physical operator)

With that I wrote the following query (that's described in more
details in Spark Structured Streaming gitbook [2]):

val valuesPerGroup = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  withColumn("tokens", split('value, ",")).
  withColumn("group", 'tokens(0)).
  withColumn("value", 'tokens(1) cast "int").
  select("group", "value").
  groupBy($"group").
  agg(collect_list("value") as "values").
  orderBy($"group".asc)
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = valuesPerGroup.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Complete).
  start

---
Batch: 0
---
+-+--+
|group|values|
+-+--+
+-+--+

// there's only 1 stateful operator and hence 0 for the index in stateOperators
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 0,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 60
}

// publish 1 new keys and values in a single streaming batch
// 0,1

---
Batch: 1
---
+-+--+
|group|values|
+-+--+
|0|[1]   |
+-+--+

// it's Complete output mode so numRowsTotal is the number of keys in
the state store
// no keys were available earlier (it's just started!) and so
numRowsUpdated is 0
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 1,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 324
}

// publish new key and old key in a single streaming batch
// new keys
// 1,1
// updates to already-stored keys
// 0,2

---
Batch: 2
---
+-+--+
|group|values|
+-+--+
|0|[2, 1]|
|1|[1]   |
+-+--+

// it's Complete output mode so numRowsTotal is the number of keys in
the state store
// no keys were available earlier and so numRowsUpdated is...0?!
// Think it's a BUG as it should've been 1 (for the row 0,2)
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 2,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 572
}

As you can see numRowsUpdated is 0 while I think should have been 1
(for the row 0,2) as the key 0 was already in the state store.

There's this code [3] that resets the newNumRowsUpdated metric when no
new data was available that looks fishy.

Is my understanding correct and numRowsUpdated should be 1 in Complete
output mode? Where's the issue (in the code or in my understanding)?
I'd appreciate any help. Thanks!

[1] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L249

[2] 
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-StateStoreSaveExec.html

[3] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L192

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-30 Thread kant kodali
Hi TD,

Thanks for the explanation and for the clear pseudo code and an example!

mapGroupsWithState is cool and looks very flexible however I have few
concerns and questions. For example

Say I store TrainHistory as max heap from the Java Collections library and
I keep adding to to this heap for 24 hours and at some point I will run out
of Java heap space right? Do I need to store TrainHistory as a DataSet or
DataFrame instead of in memory max heap object from Java Collections library
?

I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState* which
approach is more efficient to solve this particular problem ?

Thanks!





On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das 
wrote:

> Aah, I might have misinterpreted. The groupBy + window solution would give
> the max time for each train over 24 hours (non-overlapping window) of event
> data (timestamped by activity_timestamp). So the output would be like.
>
> Train Dest   Window(activity_timestamp)max(Time)
> 1 HK Aug28-00:00 to Aug29-00:0010:00<- updating
> currently through aug29
> 1 HKAug27-00:00 to Aug28-00:00 09:00<- not updating as
> no new updates coming in with activity_timestamp in this range.
>
> The drawback of this approach is that as soon as Aug28 starts, you have
> wait for new event about a train to get a new max(time). You may rather
> want a rolling 24 hour period, that is, the max time known over events in
> the last 24 hours.
> Then maintaining our own custom state using 
> mapGroupsWithState/flatMapGroupsWithState()
> is the best and most flexible option.
> It is available in Spark 2.2 in Scala, Java.
>
> Here is an example that tracks sessions based on events.
> Scala - https://github.com/apache/spark/blob/master/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredSessionization.scala
>
> You will have to create a custom per-train state which keeps track of last
> 24 hours of trains history, and use that state to calculate the max time
> for each train.
>
>
> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
> state: GroupState[TrainHistory]): Long = {
> // for every event, update history (i.e. last 24 hours of events) and
> return the max time from the history
> }
>
> trainTimesDataset // Dataset[TrainEvents]
>   .groupByKey(_.train)
>   .mapGroupsWithState(updateHistoryAndGetMax)
>
> Hope this helps.
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz  wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return the
>> exact solution, since it also groups by on destination. I would say the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days of
>>> max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>>> "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali  wrote:
>>>
 Hi,

 Thanks for the response. Since this is a streaming based query and in
 my case I need to hold state for 24 hours which I forgot to mention in my
 previous email. can I do ?

  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
 hours"), "train", "dest").max("time")*


 On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
> Int, dest: String, time: Timestamp] *
>
>
> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>
>
> *SQL*: *"select train, dest, max(time) from trainTimesView group by
> train, dest"*// after calling
> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>
>
> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I am wondering what is the easiest and concise way to express the
>> computation below in Spark Structured streaming given that it supports 
>> both
>> imperative and declarative styles?
>> I am just trying to select rows that has max timestamp for each
>> train? Instead of doing some sort of nested queries like we normally do 
>> in
>> any relational database I am trying to see if I can leverage both
>> imperative and declarative at the same time. If nested queries or join 
>> are
>> not required then I would like to see 

Different watermark for different kafka partitions in Structured Streaming

2017-08-30 Thread 张万新
Hi,

I'm working with Structured Streaming to process logs from kafka and use
watermark to handle late events. Currently the watermark is computed by (max
event time seen by the engine - late threshold), and the same watermark is
used for all partitions.

But in production environment it happens frequently that different
partition is consumed at different speed, the consumption of some
partitions may be left behind, so the newest event time in these partitions
may be much smaller than than the others'. In this case using the same
watermark for all partitions may cause heavy data loss.

So is there any way to achieve different watermark for different kafka
partition or any plan to work on this?
-- 
Wanxin Zhang,
Master candidate,
National Lab for Parallel and Distributed Processing (PDL),
School of Computer Science,
National University of Defense Technology,
Changsha, Hunan, China


spark streaming and reactive streams

2017-08-30 Thread Mich Talebzadeh
hi,

i just had the idea of reactive streams thrown in.

i was wondering in practical terms what it adds to spark streaming. what we
have been missing so to speak?

thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.