i using spark 1.3.1 , have written small program filter data on cassandra
val sc = new sparkcontext(conf) val rdd = sc.cassandratable("foo", "bar") val date = datetime.now().minushours(1) val rdd2 = rdd.filter(r => r.getdate("date").after(date.todate)) println(rdd2.count()) sc.stop()
this program runs long time, printing messages
16/09/01 21:10:31 info executor: running task 46.0 in stage 0.0 (tid 46) 16/09/01 21:10:31 info tasksetmanager: finished task 42.0 in stage 0.0 (tid 42) in 20790 ms on localhost (43/1350)
if terminate program , change code
val date = datetime.now().minushours(1) val rdd2 = rdd.filter(r => r.getdate("date").after(date.todate))
it still runs long time messages like
6/09/01 21:14:01 info executor: running task 8.0 in stage 0.0 (tid 8) 16/09/01 21:14:01 info tasksetmanager: finished task 4.0 in stage 0.0 (tid 4) in 19395 ms on localhost (5/1350)
so seems program try load entire cassandra table in memory (or try scan completely) , apply filter. seems extremely inefficient me.
how can write code in better way spark doesn't try load entire cassandra table (or scan completely) rdd , apply filter?
your first piece of code
val rdd = sc.cassandratable("foo", "bar") val date = datetime.now().minusdays(30) rdd.filter(r => r.getdate("date").after(date.todate)).count // count filtered rdd
so careful. rdds immutable when apply filter need use returned rdd , not 1 applied function to.
val rdd = sc.cassandratable("foo", "bar") val date = datetime.now().minusdays(30) rdd.filter(r => r.getdate("date").after(date.todate)) // filters rdd println(rdd.cassandracount()) // ignores filtered rdd , counts
for more efficency on reading cassandra:
if date column clustering key can use .where
function push down predicate cassandra. other there isn't can prune data server side.
Comments
Post a Comment