java - Spark Streaming: Different average values returned by PairDStream.print -


i wrote spark program receiving data textsocketstream , i'm calculating average of temperature value. when stop sending data spark cluster after ~1 min, average should not change time of window, 1h, there 59 min over, nothing should change!

now problem found: me, data amount right: 100 entries in windowed dstream, calculated sum of values (and calculated average making avg = sum / count) fluctuating between few different average values.

here console output snippet (after stopped sending data windowedtempjoinpairdstream.print() (sum & count) , windowedtempavg.print() (average), each pairdstream<deviceid, [value]>:

------------------------------------------- time: 1472801338000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100))  ------------------------------------------- time: 1472801338000 ms ------------------------------------------- (1-2-a-b-c,0.49159008)  ------------------------------------------- time: 1472801339000 ms ------------------------------------------- (1-2-a-b-c,(49.159016,100))  ------------------------------------------- time: 1472801339000 ms ------------------------------------------- (1-2-a-b-c,0.49159014)  ------------------------------------------- time: 1472801340000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100))  ------------------------------------------- time: 1472801340000 ms ------------------------------------------- (1-2-a-b-c,0.49159008)  ------------------------------------------- time: 1472801341000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100))  ------------------------------------------- time: 1472801341000 ms ------------------------------------------- (1-2-a-b-c,0.49159008)  ------------------------------------------- time: 1472801342000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100))  ------------------------------------------- time: 1472801342000 ms ------------------------------------------- (1-2-a-b-c,0.49159008)  ------------------------------------------- time: 1472801343000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100))  ------------------------------------------- time: 1472801343000 ms ------------------------------------------- (1-2-a-b-c,0.49159008)  ------------------------------------------- time: 1472801344000 ms ------------------------------------------- (1-2-a-b-c,(49.15901,100))  ------------------------------------------- time: 1472801344000 ms ------------------------------------------- (1-2-a-b-c,0.4915901) 

here different average values above, in short:

(1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159014) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.4915901) 

for me, seems rounding problem, since temperature values of type float. if possible, how solve problem?

with temperature values of type integer worked fine, no fluctuating...

if useful, here appropriate code snippet of program:

        javareceiverinputdstream<string> ingoingstream = streamingcontext.sockettextstream(serverip, 11833);          // 2. map dstream<string> dstream<sensordata> deserializing json objects         javadstream<sensordata> sensordstream = ingoingstream.map(new function<string, sensordata>() {             public sensordata call(string json) throws exception {                 objectmapper om = new objectmapper();                 return (sensordata)om.readvalue(json, sensordata.class);             }         }).cache();           /************************************************ moviing average of temperature *******************************************************************/          // collect data window of time (this time period average calculation, older data removed stream!)         javadstream<sensordata> windowmovingaveragesensordatatemp = sensordstream.window(windowsizemovingaveragetemperature);          windowmovingaveragesensordatatemp.print();          // map sensordata stream new pairdstream, key = deviceid (so can make calculations grouping id)         // .cache stream, because re-use more 1 time!         javapairdstream<string, sensordata> windowmovingaveragesensordatatemppairdstream = windowmovingaveragesensordatatemp.maptopair(new pairfunction<sensordata, string, sensordata>() {             public tuple2<string, sensordata> call(sensordata data) throws exception {                 return new tuple2<string, sensordata>(data.getidsensor(), data);             }         }).cache();          // a) map pairdstream above new pairdstream of form <deviceid, temperature>         // b) sum values total sum, grouped key (= device id)         // => combined these 2 transactions, called separately (like above)         javapairdstream<string, float> windowmovingaveragesensordatatemppairdstreamsum = windowmovingaveragesensordatatemppairdstream.maptopair(new pairfunction<tuple2<string,sensordata>, string, float>() {             public tuple2<string, float> call(tuple2<string, sensordata> sensordatapair) throws exception {                 string key = sensordatapair._1();                 float value = sensordatapair._2().getvaltemp();                 return new tuple2<string, float>(key, value);             }         }).reducebykey(new function2<float, float, float>() {             public float call(float suma, float sumb) throws exception {                 return suma + sumb;             }         });          // a) map pairdstream above new pairdstream of form <deviceid, 1l> prepare counting (1 = 1 entry)         // b) sum values total count of entries, grouped key (= device id)         // => combined both calls         javapairdstream<string, long> windowmovingaveragesensordatatemppairdstreamcount = windowmovingaveragesensordatatemppairdstream.maptopair(new pairfunction<tuple2<string,sensordata>, string, long>() {             public tuple2<string, long> call(tuple2<string, sensordata> sensordatapair) throws exception {                 string key = sensordatapair._1();                 long value = 1l;                 return new tuple2<string, long>(key, value);             }         }).reducebykey(new function2<long, long, long>() {             public long call(long counta, long countb) throws exception {                 return counta + countb;             }         });          // make join of sum , count streams, puts data same keys (device id)         // results in new pairdstream of <deviceid, <sumoftemp, countofentries>>         javapairdstream<string, tuple2<float, long>> windowedtempjoinpairdstream = windowmovingaveragesensordatatemppairdstreamsum.join(windowmovingaveragesensordatatemppairdstreamcount).cache();          // calculate average temperature avg = sumoftemp / countofentries, each key (device id)         javapairdstream<string, float> windowedtempavg = windowedtempjoinpairdstream.maptopair(new pairfunction<tuple2<string,tuple2<float,long>>, string, float>() {             public tuple2<string, float> call(tuple2<string, tuple2<float, long>> joineddata) throws exception {                 string key = joineddata._1();                 float tempsum = joineddata._2()._1();                 long count = joineddata._2()._2();                  float avg = tempsum / (float)count;                 return new tuple2<string, float>(key, avg);             }         });          // print joined pairdstream above check sum & count visually         windowedtempjoinpairdstream.print();          // print final, calculated average values each device id in form (deviceid, avgtemperature)         windowedtempavg.print();          // ========================================================= start stream ============================================================          // start streaming & listen until stream closed         streamingcontext.start();         streamingcontext.awaittermination(); 

edit: spark app using statcounter average calculation:

just changed code work statcounter average calculation, still getting different average values:

------------------------------------------- time: 1473077627000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435302)  ------------------------------------------- time: 1473077628000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435303)  ------------------------------------------- time: 1473077629000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435301)  ------------------------------------------- time: 1473077630000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435302)  ------------------------------------------- time: 1473077631000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435301)  ------------------------------------------- time: 1473077632000 ms ------------------------------------------- (1-2-a-b-c,0.47797978724353024)  ------------------------------------------- time: 1473077633000 ms ------------------------------------------- (1-2-a-b-c,0.47797978724353013) 

here new code snippet:

/************************************************ moviing average of temperature *******************************************************************/  javadstream<sensordata> windowmovingaveragesensordatatemp = sensordstream.window(windowsizemovingaveragetemperature);  javapairdstream<string, sensordata> windowmovingaveragesensordatatemppairdstream = windowmovingaveragesensordatatemp.maptopair(new pairfunction<sensordata, string, sensordata>() {     public tuple2<string, sensordata> call(sensordata data) throws exception {         return new tuple2<string, sensordata>(data.getidsensor(), data);     } }).cache();  javapairdstream<string, statcounter> preparedavgpairstream = windowmovingaveragesensordatatemppairdstream.combinebykey(new function<sensordata, statcounter>() {     public statcounter call(sensordata data) throws exception {         return new statcounter().merge(data.getvaltemp());     } }, new function2<statcounter, sensordata, statcounter>() {     public statcounter call(statcounter sc, sensordata sensordata) throws exception {         return sc.merge(sensordata.getvaltemp());     } }, new function2<statcounter, statcounter, statcounter>() {     public statcounter call(statcounter sc1, statcounter sc2) throws exception {         return sc1.merge(sc2);     } }, new hashpartitioner(60));  javapairdstream<string, double> avgpairstream = preparedavgpairstream.maptopair(new pairfunction<tuple2<string,statcounter>, string, double>() {     public tuple2<string, double> call(tuple2<string, statcounter> statcounterbykey) throws exception {         string key = statcounterbykey._1();         double value = statcounterbykey._2().mean();         return new tuple2<string, double> (key, value);     } });  avgpairstream.print(); 

at least @ first glance not particularly strange. suggested due rounding errors. since fp arithmetics neither associative nor commutative , spark shuffles nondeterministic can expect results fluctuate run run.

how can highly depends on constraints:

  • for starters computing average directly not numerically stable. better use o.a.s.util.statcounter implements variant of the online algorithm has better numerical properties.
  • if can afford can use arbitrary precision numbers bigdecimal.
  • finally enforcing summation order little bit of repartition , secondary sort magic can provide consistent (although not necessary precise) results.

Comments