scala - Cassandra Spark Connector and filtering data -


i using spark 1.3.1 , have written small program filter data on cassandra

val sc = new sparkcontext(conf) val rdd = sc.cassandratable("foo", "bar") val date = datetime.now().minushours(1) val rdd2 = rdd.filter(r => r.getdate("date").after(date.todate)) println(rdd2.count()) sc.stop() 

this program runs long time, printing messages

16/09/01 21:10:31 info executor: running task 46.0 in stage 0.0 (tid 46) 16/09/01 21:10:31 info tasksetmanager: finished task 42.0 in stage 0.0 (tid 42) in 20790 ms on localhost (43/1350) 

if terminate program , change code

val date = datetime.now().minushours(1) val rdd2 = rdd.filter(r => r.getdate("date").after(date.todate)) 

it still runs long time messages like

6/09/01 21:14:01 info executor: running task 8.0 in stage 0.0 (tid 8) 16/09/01 21:14:01 info tasksetmanager: finished task 4.0 in stage 0.0 (tid 4) in 19395 ms on localhost (5/1350) 

so seems program try load entire cassandra table in memory (or try scan completely) , apply filter. seems extremely inefficient me.

how can write code in better way spark doesn't try load entire cassandra table (or scan completely) rdd , apply filter?

your first piece of code

val rdd = sc.cassandratable("foo", "bar") val date = datetime.now().minusdays(30) rdd.filter(r => r.getdate("date").after(date.todate)).count // count filtered rdd 

so careful. rdds immutable when apply filter need use returned rdd , not 1 applied function to.


val rdd = sc.cassandratable("foo", "bar") val date = datetime.now().minusdays(30) rdd.filter(r => r.getdate("date").after(date.todate)) // filters rdd println(rdd.cassandracount()) // ignores filtered rdd , counts 

for more efficency on reading cassandra:

if date column clustering key can use .where function push down predicate cassandra. other there isn't can prune data server side.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where


Comments