Re: Is that normal spark performance?

2016-06-15 Thread Deepak Goel
I am not an expert, but it seems all your processing is done on node1 while
node2 is lying idle

Hey

Namaskara~Nalama~Guten Tag~Bonjour


   --
Keigu

Deepak
73500 12833
www.simtree.net, dee...@simtree.net
deic...@gmail.com

LinkedIn: www.linkedin.com/in/deicool
Skype: thumsupdeicool
Google talk: deicool
Blog: http://loveandfearless.wordpress.com
Facebook: http://www.facebook.com/deicool

"Contribute to the world, environment and more : http://www.gridrepublic.org
"

On Wed, Jun 15, 2016 at 7:35 PM, Jörn Franke  wrote:

>
> What Volume do you have? Why do not you use the corresponding Cassandra
> functionality directly?
> If you do it once and not iteratively in-memory you cannot expect so much
> improvement
>
> On 15 Jun 2016, at 16:01, nikita.dobryukha  wrote:
>
> We use Cassandra 3.5 + Spark 1.6.1 in 2-node cluster (8 cores and 1g
> memory per node). There is the following Cassandra table
>
> CREATE TABLE schema.trade (
> symbol text,
> date int,
> trade_time timestamp,
> reporting_venue text,
> trade_id bigint,
> ref_trade_id bigint,
> action_type text,
> price double,
> quantity int,
> condition_code text,
> PRIMARY KEY ((symbol, date), trade_time, trade_id)
> ) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 
> 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};
>
> And I want to calculate percentage of volume: sum of all volume from
> trades in the relevant security during the time period groupped by exchange
> and time bar (1 or 5 minutes). I've created an example:
>
> void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, 
> Timestamp timeTill, Integer barWidth) {
> char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
> LOG.info ("start");
> JavaPairRDD counts = 
> javaFunctions(sparkContext).cassandraTable("schema", "trade")
> .filter(row ->
> row.getString("symbol").equals(symbol) && 
> row.getInt("date").equals(date) &&
> row.getDateTime("trade_time").getMillis() >= 
> timeFrom.getTime() &&
> row.getDateTime("trade_time").getMillis() < 
> timeTill.getTime())
> .mapToPair(row ->
> new Tuple2<>(
> new Tuple2(
> new Timestamp(
> 
> (row.getDateTime("trade_time").getMillis() / (barWidth * 
> MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * 
> MILLISECOND_TO_MINUTE_MULTIPLIER
> ),
> row.getString("reporting_venue")),
> row.getInt("quantity")
> )
> ).reduceByKey((a, b) -> a + b);
> LOG.info (counts.collect().toString());
> LOG.info ("finish");
> }
>
> ...
> [2016-06-15 09:25:27.014] [INFO ] [main] [EquityTCAAnalytics] start
> [2016-06-15 09:25:28.000] [INFO ] [main] [NettyUtil] Found Netty's native 
> epoll transport in the classpath, using it
> [2016-06-15 09:25:28.518] [INFO ] [main] [Cluster] New Cassandra host 
> /node1:9042 added
> [2016-06-15 09:25:28.519] [INFO ] [main] [LocalNodeFirstLoadBalancingPolicy] 
> Added host node1 (datacenter1)
> [2016-06-15 09:25:28.519] [INFO ] [main] [Cluster] New Cassandra host 
> /node2:9042 added
> [2016-06-15 09:25:28.520] [INFO ] [main] [CassandraConnector] Connected to 
> Cassandra cluster: Cassandra
> [2016-06-15 09:25:29.115] [INFO ] [main] [SparkContext] Starting job: collect 
> at EquityTCAAnalytics.java:88
> [2016-06-15 09:25:29.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Registering RDD 2 (mapToPair at EquityTCAAnalytics.java:78)
> [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Got job 0 (collect at EquityTCAAnalytics.java:88) with 5 output partitions
> [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Final stage: ResultStage 1 (collect at EquityTCAAnalytics.java:88)
> [2016-06-15 09:25:29.389] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Parents of final stage: List(ShuffleMapStage 0)
> [2016-06-15 09:25:29.391] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Missing parents: List(ShuffleMapStage 0)
> [2016-06-15 09:25:29.400] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at 
> EquityTCAAnalytics.java:78), which has no missing parents
> [2016-06-15 09:25:29.594] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_0 stored as values in memory (estimated size 10.8 KB, free 
> 10.8 KB)
> [2016-06-15 09:25:29.642] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.4 KB, 
> free 16.3 KB)
> [2016-06-15 09:25:29.647] [INFO ] [dispatcher-event-loop-7] 
> [BlockManagerInfo] Added broadcast_0_piece0 in memory on node2:44871 (size: 
> 5.4 KB, free: 2.4 GB)

Re: Is that normal spark performance?

2016-06-15 Thread Jörn Franke

What Volume do you have? Why do not you use the corresponding Cassandra 
functionality directly? 
If you do it once and not iteratively in-memory you cannot expect so much 
improvement 

> On 15 Jun 2016, at 16:01, nikita.dobryukha  wrote:
> 
> We use Cassandra 3.5 + Spark 1.6.1 in 2-node cluster (8 cores and 1g memory 
> per node). There is the following Cassandra table
> CREATE TABLE schema.trade (
> symbol text,
> date int,
> trade_time timestamp,
> reporting_venue text,
> trade_id bigint,
> ref_trade_id bigint,
> action_type text,
> price double,
> quantity int,
> condition_code text,
> PRIMARY KEY ((symbol, date), trade_time, trade_id)
> ) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 
> 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};
> And I want to calculate percentage of volume: sum of all volume from trades 
> in the relevant security during the time period groupped by exchange and time 
> bar (1 or 5 minutes). I've created an example:
> void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, 
> Timestamp timeTill, Integer barWidth) {
> char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
> LOG.info("start");
> JavaPairRDD counts = 
> javaFunctions(sparkContext).cassandraTable("schema", "trade")
> .filter(row ->
> row.getString("symbol").equals(symbol) && 
> row.getInt("date").equals(date) &&
> row.getDateTime("trade_time").getMillis() >= 
> timeFrom.getTime() &&
> row.getDateTime("trade_time").getMillis() < 
> timeTill.getTime())
> .mapToPair(row ->
> new Tuple2<>(
> new Tuple2(
> new Timestamp(
> 
> (row.getDateTime("trade_time").getMillis() / (barWidth * 
> MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * 
> MILLISECOND_TO_MINUTE_MULTIPLIER
> ),
> row.getString("reporting_venue")),
> row.getInt("quantity")
> )
> ).reduceByKey((a, b) -> a + b);
> LOG.info(counts.collect().toString());
> LOG.info("finish");
> }
> ...
> [2016-06-15 09:25:27.014] [INFO ] [main] [EquityTCAAnalytics] start
> [2016-06-15 09:25:28.000] [INFO ] [main] [NettyUtil] Found Netty's native 
> epoll transport in the classpath, using it
> [2016-06-15 09:25:28.518] [INFO ] [main] [Cluster] New Cassandra host 
> /node1:9042 added
> [2016-06-15 09:25:28.519] [INFO ] [main] [LocalNodeFirstLoadBalancingPolicy] 
> Added host node1 (datacenter1)
> [2016-06-15 09:25:28.519] [INFO ] [main] [Cluster] New Cassandra host 
> /node2:9042 added
> [2016-06-15 09:25:28.520] [INFO ] [main] [CassandraConnector] Connected to 
> Cassandra cluster: Cassandra
> [2016-06-15 09:25:29.115] [INFO ] [main] [SparkContext] Starting job: collect 
> at EquityTCAAnalytics.java:88
> [2016-06-15 09:25:29.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Registering RDD 2 (mapToPair at EquityTCAAnalytics.java:78)
> [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Got job 0 (collect at EquityTCAAnalytics.java:88) with 5 output partitions
> [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Final stage: ResultStage 1 (collect at EquityTCAAnalytics.java:88)
> [2016-06-15 09:25:29.389] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Parents of final stage: List(ShuffleMapStage 0)
> [2016-06-15 09:25:29.391] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Missing parents: List(ShuffleMapStage 0)
> [2016-06-15 09:25:29.400] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at 
> EquityTCAAnalytics.java:78), which has no missing parents
> [2016-06-15 09:25:29.594] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_0 stored as values in memory (estimated size 10.8 KB, free 
> 10.8 KB)
> [2016-06-15 09:25:29.642] [INFO ] [dag-scheduler-event-loop] [MemoryStore] 
> Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.4 KB, 
> free 16.3 KB)
> [2016-06-15 09:25:29.647] [INFO ] [dispatcher-event-loop-7] 
> [BlockManagerInfo] Added broadcast_0_piece0 in memory on node2:44871 (size: 
> 5.4 KB, free: 2.4 GB)
> [2016-06-15 09:25:29.650] [INFO ] [dag-scheduler-event-loop] [SparkContext] 
> Created broadcast 0 from broadcast at DAGScheduler.scala:1006
> [2016-06-15 09:25:29.658] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] 
> Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at 
> mapToPair at EquityTCAAnalytics.java:78)
> [2016-06-15 09:25:29.661] [INFO ] [dag-scheduler-event-loop] 
> [TaskSchedulerImpl] Adding task set 0.0 with 5 tasks
> [2016-06-15 09:25:30.006] [INFO ] [dispatcher-event-loop-7] 
> [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) 
>