What Volume do you have? Why do not you use the corresponding Cassandra functionality directly? If you do it once and not iteratively in-memory you cannot expect so much improvement
> On 15 Jun 2016, at 16:01, nikita.dobryukha <n.dobryu...@gmail.com> wrote: > > We use Cassandra 3.5 + Spark 1.6.1 in 2-node cluster (8 cores and 1g memory > per node). There is the following Cassandra table > CREATE TABLE schema.trade ( > symbol text, > date int, > trade_time timestamp, > reporting_venue text, > trade_id bigint, > ref_trade_id bigint, > action_type text, > price double, > quantity int, > condition_code text, > PRIMARY KEY ((symbol, date), trade_time, trade_id) > ) WITH compaction = {'class' : 'DateTieredCompactionStrategy', > 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'}; > And I want to calculate percentage of volume: sum of all volume from trades > in the relevant security during the time period groupped by exchange and time > bar (1 or 5 minutes). I've created an example: > void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, > Timestamp timeTill, Integer barWidth) { > char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000; > LOG.info("start"); > JavaPairRDD<Tuple2, Integer> counts = > javaFunctions(sparkContext).cassandraTable("schema", "trade") > .filter(row -> > row.getString("symbol").equals(symbol) && > row.getInt("date").equals(date) && > row.getDateTime("trade_time").getMillis() >= > timeFrom.getTime() && > row.getDateTime("trade_time").getMillis() < > timeTill.getTime()) > .mapToPair(row -> > new Tuple2<>( > new Tuple2( > new Timestamp( > > (row.getDateTime("trade_time").getMillis() / (barWidth * > MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * > MILLISECOND_TO_MINUTE_MULTIPLIER > ), > row.getString("reporting_venue")), > row.getInt("quantity") > ) > ).reduceByKey((a, b) -> a + b); > LOG.info(counts.collect().toString()); > LOG.info("finish"); > } > ... > [2016-06-15 09:25:27.014] [INFO ] [main] [EquityTCAAnalytics] start > [2016-06-15 09:25:28.000] [INFO ] [main] [NettyUtil] Found Netty's native > epoll transport in the classpath, using it > [2016-06-15 09:25:28.518] [INFO ] [main] [Cluster] New Cassandra host > /node1:9042 added > [2016-06-15 09:25:28.519] [INFO ] [main] [LocalNodeFirstLoadBalancingPolicy] > Added host node1 (datacenter1) > [2016-06-15 09:25:28.519] [INFO ] [main] [Cluster] New Cassandra host > /node2:9042 added > [2016-06-15 09:25:28.520] [INFO ] [main] [CassandraConnector] Connected to > Cassandra cluster: Cassandra > [2016-06-15 09:25:29.115] [INFO ] [main] [SparkContext] Starting job: collect > at EquityTCAAnalytics.java:88 > [2016-06-15 09:25:29.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Registering RDD 2 (mapToPair at EquityTCAAnalytics.java:78) > [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Got job 0 (collect at EquityTCAAnalytics.java:88) with 5 output partitions > [2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Final stage: ResultStage 1 (collect at EquityTCAAnalytics.java:88) > [2016-06-15 09:25:29.389] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Parents of final stage: List(ShuffleMapStage 0) > [2016-06-15 09:25:29.391] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Missing parents: List(ShuffleMapStage 0) > [2016-06-15 09:25:29.400] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at > EquityTCAAnalytics.java:78), which has no missing parents > [2016-06-15 09:25:29.594] [INFO ] [dag-scheduler-event-loop] [MemoryStore] > Block broadcast_0 stored as values in memory (estimated size 10.8 KB, free > 10.8 KB) > [2016-06-15 09:25:29.642] [INFO ] [dag-scheduler-event-loop] [MemoryStore] > Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.4 KB, > free 16.3 KB) > [2016-06-15 09:25:29.647] [INFO ] [dispatcher-event-loop-7] > [BlockManagerInfo] Added broadcast_0_piece0 in memory on node2:44871 (size: > 5.4 KB, free: 2.4 GB) > [2016-06-15 09:25:29.650] [INFO ] [dag-scheduler-event-loop] [SparkContext] > Created broadcast 0 from broadcast at DAGScheduler.scala:1006 > [2016-06-15 09:25:29.658] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at > mapToPair at EquityTCAAnalytics.java:78) > [2016-06-15 09:25:29.661] [INFO ] [dag-scheduler-event-loop] > [TaskSchedulerImpl] Adding task set 0.0 with 5 tasks > [2016-06-15 09:25:30.006] [INFO ] [dispatcher-event-loop-7] > [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) > (node1:41122) with ID 0 > [2016-06-15 09:25:30.040] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] > Starting task 0.0 in stage 0.0 (TID 0, node1, partition 0,NODE_LOCAL, 11725 > bytes) > [2016-06-15 09:25:30.051] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] > Starting task 1.0 in stage 0.0 (TID 1, node1, partition 1,NODE_LOCAL, 11317 > bytes) > [2016-06-15 09:25:30.054] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] > Starting task 2.0 in stage 0.0 (TID 2, node1, partition 2,NODE_LOCAL, 11929 > bytes) > [2016-06-15 09:25:30.057] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] > Starting task 3.0 in stage 0.0 (TID 3, node1, partition 3,NODE_LOCAL, 11249 > bytes) > [2016-06-15 09:25:30.059] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] > Starting task 4.0 in stage 0.0 (TID 4, node1, partition 4,NODE_LOCAL, 11560 > bytes) > [2016-06-15 09:25:30.077] [INFO ] [dispatcher-event-loop-7] > [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) > (CassandraCH4.ehubprod.local:33668) with ID 1 > [2016-06-15 09:25:30.111] [INFO ] [dispatcher-event-loop-4] > [BlockManagerMasterEndpoint] Registering block manager node1:36512 with 511.1 > MB RAM, BlockManagerId(0, node1, 36512) > [2016-06-15 09:25:30.168] [INFO ] [dispatcher-event-loop-3] > [BlockManagerMasterEndpoint] Registering block manager > CassandraCH4.ehubprod.local:33610 with 511.1 MB RAM, BlockManagerId(1, > CassandraCH4.ehubprod.local, 33610) > [2016-06-15 09:25:30.818] [INFO ] [dispatcher-event-loop-2] > [BlockManagerInfo] Added broadcast_0_piece0 in memory on node1:36512 (size: > 5.4 KB, free: 511.1 MB) > [2016-06-15 09:25:36.764] [INFO ] [pool-21-thread-1] [CassandraConnector] > Disconnected from Cassandra cluster: Cassandra > [2016-06-15 09:25:48.914] [INFO ] [task-result-getter-0] [TaskSetManager] > Finished task 4.0 in stage 0.0 (TID 4) in 18854 ms on node1 (1/5) > [2016-06-15 09:25:55.541] [INFO ] [task-result-getter-1] [TaskSetManager] > Finished task 2.0 in stage 0.0 (TID 2) in 25489 ms on node1 (2/5) > [2016-06-15 09:25:57.837] [INFO ] [task-result-getter-2] [TaskSetManager] > Finished task 1.0 in stage 0.0 (TID 1) in 27795 ms on node1 (3/5) > [2016-06-15 09:25:57.931] [INFO ] [task-result-getter-3] [TaskSetManager] > Finished task 0.0 in stage 0.0 (TID 0) in 27919 ms on node1 (4/5) > [2016-06-15 09:26:01.357] [INFO ] [task-result-getter-0] [TaskSetManager] > Finished task 3.0 in stage 0.0 (TID 3) in 31302 ms on node1 (5/5) > [2016-06-15 09:26:01.358] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > ShuffleMapStage 0 (mapToPair at EquityTCAAnalytics.java:78) finished in > 31.602 s > [2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > looking for newly runnable stages > [2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > running: Set() > [2016-06-15 09:26:01.360] [INFO ] [task-result-getter-0] [TaskSchedulerImpl] > Removed TaskSet 0.0, whose tasks have all completed, from pool > [2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > waiting: Set(ResultStage 1) > [2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > failed: Set() > [2016-06-15 09:26:01.365] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Submitting ResultStage 1 (ShuffledRDD[3] at reduceByKey at > EquityTCAAnalytics.java:87), which has no missing parents > [2016-06-15 09:26:01.373] [INFO ] [dag-scheduler-event-loop] [MemoryStore] > Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free > 19.9 KB) > [2016-06-15 09:26:01.382] [INFO ] [dag-scheduler-event-loop] [MemoryStore] > Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, > free 21.9 KB) > [2016-06-15 09:26:01.383] [INFO ] [dispatcher-event-loop-1] > [BlockManagerInfo] Added broadcast_1_piece0 in memory on node2:44871 (size: > 2.1 KB, free: 2.4 GB) > [2016-06-15 09:26:01.384] [INFO ] [dag-scheduler-event-loop] [SparkContext] > Created broadcast 1 from broadcast at DAGScheduler.scala:1006 > [2016-06-15 09:26:01.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > Submitting 5 missing tasks from ResultStage 1 (ShuffledRDD[3] at reduceByKey > at EquityTCAAnalytics.java:87) > [2016-06-15 09:26:01.386] [INFO ] [dag-scheduler-event-loop] > [TaskSchedulerImpl] Adding task set 1.0 with 5 tasks > [2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] > Starting task 0.0 in stage 1.0 (TID 5, node1, partition 0,NODE_LOCAL, 2786 > bytes) > [2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] > Starting task 1.0 in stage 1.0 (TID 6, node1, partition 1,NODE_LOCAL, 2786 > bytes) > [2016-06-15 09:26:01.397] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] > Starting task 2.0 in stage 1.0 (TID 7, node1, partition 2,NODE_LOCAL, 2786 > bytes) > [2016-06-15 09:26:01.398] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] > Starting task 3.0 in stage 1.0 (TID 8, node1, partition 3,NODE_LOCAL, 2786 > bytes) > [2016-06-15 09:26:01.406] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] > Starting task 4.0 in stage 1.0 (TID 9, node1, partition 4,NODE_LOCAL, 2786 > bytes) > [2016-06-15 09:26:01.429] [INFO ] [dispatcher-event-loop-4] > [BlockManagerInfo] Added broadcast_1_piece0 in memory on node1:36512 (size: > 2.1 KB, free: 511.1 MB) > [2016-06-15 09:26:01.452] [INFO ] [dispatcher-event-loop-6] > [MapOutputTrackerMasterEndpoint] Asked to send map output locations for > shuffle 0 to node1:41122 > [2016-06-15 09:26:01.456] [INFO ] [dispatcher-event-loop-6] > [MapOutputTrackerMaster] Size of output statuses for shuffle 0 is 161 bytes > [2016-06-15 09:26:01.526] [INFO ] [task-result-getter-1] [TaskSetManager] > Finished task 4.0 in stage 1.0 (TID 9) in 128 ms on node1 (1/5) > [2016-06-15 09:26:01.575] [INFO ] [task-result-getter-3] [TaskSetManager] > Finished task 2.0 in stage 1.0 (TID 7) in 184 ms on node1 (2/5) > [2016-06-15 09:26:01.580] [INFO ] [task-result-getter-2] [TaskSetManager] > Finished task 0.0 in stage 1.0 (TID 5) in 193 ms on node1 (3/5) > [2016-06-15 09:26:01.589] [INFO ] [task-result-getter-3] [TaskSetManager] > Finished task 1.0 in stage 1.0 (TID 6) in 199 ms on node1 (4/5) > [2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSetManager] > Finished task 3.0 in stage 1.0 (TID 8) in 200 ms on node1 (5/5) > [2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSchedulerImpl] > Removed TaskSet 1.0, whose tasks have all completed, from pool > [2016-06-15 09:26:01.599] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] > ResultStage 1 (collect at EquityTCAAnalytics.java:88) finished in 0.202 s > [2016-06-15 09:26:01.612] [INFO ] [main] [DAGScheduler] Job 0 finished: > collect at EquityTCAAnalytics.java:88, took 32.496470 s > [2016-06-15 09:26:01.634] [INFO ] [main] [EquityTCAAnalytics] [((2016-06-10 > 13:45:00.0,DA),6944), ((2016-06-10 14:25:00.0,B),5241), ..., ((2016-06-10 > 10:55:00.0,QD),109080), ((2016-06-10 14:55:00.0,A),1300)] > [2016-06-15 09:26:01.641] [INFO ] [main] [EquityTCAAnalytics] finish > 32.5 s is normal? > View this message in context: Is that normal spark performance? > Sent from the Apache Spark User List mailing list archive at Nabble.com.