i running spark streaming job running off of kafka. messages in this:
val messagestream = kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder, (string, int, long, string)](ssc, getkafkabrokers(), getkafkatopics("raw"), (mmd: messageandmetadata[string, string]) => { (mmd.topic, mmd.partition, mmd.offset, mmd.message) })
now bring data in want group topic , partition same topic/partition can process in 1 batch. right function use here
messagestream.foreachrdd(x => x.?
is groupby? , if groupby how group first 2 parts of tuple have in. kafkardd[0] have many messages in want group them sets of messages able process each grouping chunk vs individual messages.
edit: based on below feed have this:
messagestream.foreachrdd(x => x.groupby(x => (x._1, x._2)).foreach(x => { ? }))
is in k,v k (topic, partition), value (offset, topic)? need 1st , 2nd part of tuple because allow me make api call instructions on message. dont want individually call api on each message because lot of them have same instruction set based on topic/partition.
edit: realized comes as:
k:(topic, partition) v: compactbuffer((topic, partition, offset, message), ()) etc.
messagestream.foreachrdd(x => x.groupby(x => (x._1, x._2)).foreach(x => { val topic = x._1_.1 val partition = x._1._2 x._2.foreach(x=> ... }))
to groupby first 2 parts in tuple, can try following:
messagestream groupby (x => (x._1, x._2))
Comments
Post a Comment