i wrote spark program receiving data textsocketstream
, i'm calculating average of temperature value. when stop sending data spark cluster after ~1 min
, average should not change time of window, 1h
, there 59 min
over, nothing should change!
now problem found: me, data amount right: 100 entries in windowed dstream
, calculated sum of values (and calculated average making avg = sum / count
) fluctuating between few different average values.
here console output snippet (after stopped sending data windowedtempjoinpairdstream.print()
(sum & count) , windowedtempavg.print()
(average), each pairdstream<deviceid, [value]>
:
------------------------------------------- time: 1472801338000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100)) ------------------------------------------- time: 1472801338000 ms ------------------------------------------- (1-2-a-b-c,0.49159008) ------------------------------------------- time: 1472801339000 ms ------------------------------------------- (1-2-a-b-c,(49.159016,100)) ------------------------------------------- time: 1472801339000 ms ------------------------------------------- (1-2-a-b-c,0.49159014) ------------------------------------------- time: 1472801340000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100)) ------------------------------------------- time: 1472801340000 ms ------------------------------------------- (1-2-a-b-c,0.49159008) ------------------------------------------- time: 1472801341000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100)) ------------------------------------------- time: 1472801341000 ms ------------------------------------------- (1-2-a-b-c,0.49159008) ------------------------------------------- time: 1472801342000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100)) ------------------------------------------- time: 1472801342000 ms ------------------------------------------- (1-2-a-b-c,0.49159008) ------------------------------------------- time: 1472801343000 ms ------------------------------------------- (1-2-a-b-c,(49.159008,100)) ------------------------------------------- time: 1472801343000 ms ------------------------------------------- (1-2-a-b-c,0.49159008) ------------------------------------------- time: 1472801344000 ms ------------------------------------------- (1-2-a-b-c,(49.15901,100)) ------------------------------------------- time: 1472801344000 ms ------------------------------------------- (1-2-a-b-c,0.4915901)
here different average values above, in short:
(1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159014) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.49159008) (1-2-a-b-c,0.4915901)
for me, seems rounding problem, since temperature values of type float
. if possible, how solve problem?
with temperature values of type integer
worked fine, no fluctuating...
if useful, here appropriate code snippet of program:
javareceiverinputdstream<string> ingoingstream = streamingcontext.sockettextstream(serverip, 11833); // 2. map dstream<string> dstream<sensordata> deserializing json objects javadstream<sensordata> sensordstream = ingoingstream.map(new function<string, sensordata>() { public sensordata call(string json) throws exception { objectmapper om = new objectmapper(); return (sensordata)om.readvalue(json, sensordata.class); } }).cache(); /************************************************ moviing average of temperature *******************************************************************/ // collect data window of time (this time period average calculation, older data removed stream!) javadstream<sensordata> windowmovingaveragesensordatatemp = sensordstream.window(windowsizemovingaveragetemperature); windowmovingaveragesensordatatemp.print(); // map sensordata stream new pairdstream, key = deviceid (so can make calculations grouping id) // .cache stream, because re-use more 1 time! javapairdstream<string, sensordata> windowmovingaveragesensordatatemppairdstream = windowmovingaveragesensordatatemp.maptopair(new pairfunction<sensordata, string, sensordata>() { public tuple2<string, sensordata> call(sensordata data) throws exception { return new tuple2<string, sensordata>(data.getidsensor(), data); } }).cache(); // a) map pairdstream above new pairdstream of form <deviceid, temperature> // b) sum values total sum, grouped key (= device id) // => combined these 2 transactions, called separately (like above) javapairdstream<string, float> windowmovingaveragesensordatatemppairdstreamsum = windowmovingaveragesensordatatemppairdstream.maptopair(new pairfunction<tuple2<string,sensordata>, string, float>() { public tuple2<string, float> call(tuple2<string, sensordata> sensordatapair) throws exception { string key = sensordatapair._1(); float value = sensordatapair._2().getvaltemp(); return new tuple2<string, float>(key, value); } }).reducebykey(new function2<float, float, float>() { public float call(float suma, float sumb) throws exception { return suma + sumb; } }); // a) map pairdstream above new pairdstream of form <deviceid, 1l> prepare counting (1 = 1 entry) // b) sum values total count of entries, grouped key (= device id) // => combined both calls javapairdstream<string, long> windowmovingaveragesensordatatemppairdstreamcount = windowmovingaveragesensordatatemppairdstream.maptopair(new pairfunction<tuple2<string,sensordata>, string, long>() { public tuple2<string, long> call(tuple2<string, sensordata> sensordatapair) throws exception { string key = sensordatapair._1(); long value = 1l; return new tuple2<string, long>(key, value); } }).reducebykey(new function2<long, long, long>() { public long call(long counta, long countb) throws exception { return counta + countb; } }); // make join of sum , count streams, puts data same keys (device id) // results in new pairdstream of <deviceid, <sumoftemp, countofentries>> javapairdstream<string, tuple2<float, long>> windowedtempjoinpairdstream = windowmovingaveragesensordatatemppairdstreamsum.join(windowmovingaveragesensordatatemppairdstreamcount).cache(); // calculate average temperature avg = sumoftemp / countofentries, each key (device id) javapairdstream<string, float> windowedtempavg = windowedtempjoinpairdstream.maptopair(new pairfunction<tuple2<string,tuple2<float,long>>, string, float>() { public tuple2<string, float> call(tuple2<string, tuple2<float, long>> joineddata) throws exception { string key = joineddata._1(); float tempsum = joineddata._2()._1(); long count = joineddata._2()._2(); float avg = tempsum / (float)count; return new tuple2<string, float>(key, avg); } }); // print joined pairdstream above check sum & count visually windowedtempjoinpairdstream.print(); // print final, calculated average values each device id in form (deviceid, avgtemperature) windowedtempavg.print(); // ========================================================= start stream ============================================================ // start streaming & listen until stream closed streamingcontext.start(); streamingcontext.awaittermination();
edit: spark app using statcounter
average calculation:
just changed code work statcounter
average calculation, still getting different average values:
------------------------------------------- time: 1473077627000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435302) ------------------------------------------- time: 1473077628000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435303) ------------------------------------------- time: 1473077629000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435301) ------------------------------------------- time: 1473077630000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435302) ------------------------------------------- time: 1473077631000 ms ------------------------------------------- (1-2-a-b-c,0.4779797872435301) ------------------------------------------- time: 1473077632000 ms ------------------------------------------- (1-2-a-b-c,0.47797978724353024) ------------------------------------------- time: 1473077633000 ms ------------------------------------------- (1-2-a-b-c,0.47797978724353013)
here new code snippet:
/************************************************ moviing average of temperature *******************************************************************/ javadstream<sensordata> windowmovingaveragesensordatatemp = sensordstream.window(windowsizemovingaveragetemperature); javapairdstream<string, sensordata> windowmovingaveragesensordatatemppairdstream = windowmovingaveragesensordatatemp.maptopair(new pairfunction<sensordata, string, sensordata>() { public tuple2<string, sensordata> call(sensordata data) throws exception { return new tuple2<string, sensordata>(data.getidsensor(), data); } }).cache(); javapairdstream<string, statcounter> preparedavgpairstream = windowmovingaveragesensordatatemppairdstream.combinebykey(new function<sensordata, statcounter>() { public statcounter call(sensordata data) throws exception { return new statcounter().merge(data.getvaltemp()); } }, new function2<statcounter, sensordata, statcounter>() { public statcounter call(statcounter sc, sensordata sensordata) throws exception { return sc.merge(sensordata.getvaltemp()); } }, new function2<statcounter, statcounter, statcounter>() { public statcounter call(statcounter sc1, statcounter sc2) throws exception { return sc1.merge(sc2); } }, new hashpartitioner(60)); javapairdstream<string, double> avgpairstream = preparedavgpairstream.maptopair(new pairfunction<tuple2<string,statcounter>, string, double>() { public tuple2<string, double> call(tuple2<string, statcounter> statcounterbykey) throws exception { string key = statcounterbykey._1(); double value = statcounterbykey._2().mean(); return new tuple2<string, double> (key, value); } }); avgpairstream.print();
at least @ first glance not particularly strange. suggested due rounding errors. since fp arithmetics neither associative nor commutative , spark shuffles nondeterministic can expect results fluctuate run run.
how can highly depends on constraints:
- for starters computing average directly not numerically stable. better use
o.a.s.util.statcounter
implements variant of the online algorithm has better numerical properties. - if can afford can use arbitrary precision numbers
bigdecimal
. - finally enforcing summation order little bit of repartition , secondary sort magic can provide consistent (although not necessary precise) results.
Comments
Post a Comment