i developing streaming application uses mapwithstate function internally...
i need set checkpointing interval of checkpoinitnd data manually..
this sample code..
var newcontextcreated = false // flag detect whether new context created or not // function create new streamingcontext , set def creatingfunc(): streamingcontext = { // create streamingcontext val ssc = new streamingcontext(sc, seconds(batchintervalseconds)) // create stream generates 1000 lines per second val stream = ssc.receiverstream(new dummysource(eventspersecond)) // split lines words, , create paired (key-value) dstream val wordstream = stream.flatmap { _.split(" ") }.map(word => (word, 1)) // represents emitted stream trackstatefunc. since emit every input record updated value, // stream contain same # of records input dstream. val wordcountstatestream = wordstream.mapwithstate(statespec) wordcountstatestream.print() // snapshot of state current batch. dstream contains 1 entry per key. val statesnapshotstream = wordcountstatestream.statesnapshots() statesnapshotstream.foreachrdd { rdd => rdd.todf("word", "count").registertemptable("batch_word_count") } ssc.remember(minutes(1)) // make sure data not deleted time query interactively ssc.checkpoint("dbfs:/streaming/trackstate/100") println("creating function called create new streamingcontext") newcontextcreated = true ssc }
Comments
Post a Comment