i have java lambda function triggered s3 event every 15 minutes. i've noticed on period of every 3 hours, each lambda call includes latest file uploaded , of files uploaded before within 3 hour timespan.
so, if when iterating through entire list, repeats files had been processed in earlier lambda call.
how process recent file uploaded? in node.js, there context.suceed(), assume marks event processed. java doesn't seem have that.
below cloudwatch logs.
08:35:16 start requestid: 56c0dc17-6f77-11e6-a102-7517541f4ac3 version: $latest 08:35:26 time - authenticate: 8101ms 08:35:26 time - message parse: 1ms 08:35:26 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 08:35:35 processed 147 events 08:35:35 time - file parse: 9698 08:35:35 found 1 event files 08:35:35 total function took: 17800ms 08:35:35 end requestid: 56c0dc17-6f77-11e6-a102-7517541f4ac3 08:35:35 report requestid: 56c0dc17-6f77-11e6-a102-7517541f4ac3 duration: 19403.67 ms billed duration: 19500 ms memory size: 192 mb max memory used: 116 mb 08:45:03 start requestid: bcb8e064-6f78-11e6-baea-a312004d2418 version: $latest 08:45:03 time - authenticate: 119ms 08:45:03 time - message parse: 0ms 08:45:03 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 08:45:05 processed 147 events 08:45:05 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 08:45:06 processed 211 events 08:45:06 time - file parse: 2499 08:45:06 found 2 event files 08:45:06 total function took: 2618ms 08:45:06 end requestid: bcb8e064-6f78-11e6-baea-a312004d2418 08:45:06 report requestid: bcb8e064-6f78-11e6-baea-a312004d2418 duration: 2796.25 ms billed duration: 2800 ms memory size: 192 mb max memory used: 116 mb 09:05:02 start requestid: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc version: $latest 09:05:02 time - authenticate: 98ms 09:05:02 time - message parse: 0ms 09:05:02 data :: event/events/2016/ 08/31/2016 0831123000.export.csv 09:05:03 processed 147 events 09:05:03 data :: event/events/2016/ 08/31/2016 0831124500.export.csv 09:05:04 processed 211 events 09:05:04 data :: event/events/2016/ 08/31/2016 0831130000.export.csv 09:05:04 processed 204 events 09:05:04 time - file parse: 2242 09:05:04 found 3 event files 09:05:04 total function took: 2340ms 09:05:04 end requestid: 8747aa 08-6f7b-11e6-80fd-f30a15cf07fc
edit 1 believe question has been answered michael, below of code else. indeed using global list hold records.
public class lambdahandler {
private final list<gdelteventfile> eventfiles = new arraylist<>(); private amazons3client s3client; private final csvformat csv_format = csvformat.tdf.withignoreemptylines().withtrim(); public void gdelthandler(s3event event, context context) { stopwatch sw = new stopwatch(); long time = 0l; sw.start(); s3client = new amazons3client(new environmentvariablecredentialsprovider()); sw.split(); system.out.println("time - authenticate: " + sw.getsplittime() + "ms"); time += sw.getsplittime(); sw.reset(); sw.start(); processevent(event); sw.split(); system.out.println("time - message parse: " + sw.getsplittime() + "ms"); time += sw.getsplittime(); sw.reset(); sw.start(); processfiles(); sw.split(); system.out.println("time - file parse: " + sw.getsplittime()); time += sw.getsplittime(); system.out.println("found " + eventfiles.size() + " event files"); system.out.println("total function took: " + time + "ms"); } private void processevent(s3event event) { list<s3eventnotification.s3eventnotificationrecord> records = event.getrecords(); (s3eventnotification.s3eventnotificationrecord record : records) { long filesize = record.gets3().getobject().getsizeaslong(); eventfiles.add(new gdelteventfile(record.gets3().getbucket().getname(), record.gets3().getobject().getkey(), filesize)); } } private void processfiles() { (gdelteventfile event : eventfiles) { try { system.out.println(event.getbucket() + " :: " + event.getfilename()); getobjectrequest request = new getobjectrequest(event.getbucket(), event.getfilename()); s3object file = s3client.getobject(request); try (bufferedreader reader = new bufferedreader(new inputstreamreader(file.getobjectcontent()))) { csvparser parser = new csvparser(reader, csv_format); int count = 0; (csvrecord record : parser) { count++; } } system.out.println("processed " + count + " events"); } } catch (ioexception ioe) { system.out.println("ioexception :: " + ioe); } } }
this case of code overlooks important aspect of lambda's container reuse -- container reuse in lambda includes process reuse. when function executed in reused container, it's running in same process used before well.
s3's event notification data structure such can include more 1 object per event, practice, never happpens... pushing event data global structure means if container reused, later function invocations see old data.
while can useful cache, has significant implications how code must designed -- expect never assume process may survive 1 invocation future, subsequent invocation, , code accordingly.
note container reuse means need clean temp files, if there chance many reuses of container result in space exhaustion there.
note redeploying function code means old containers abandoned, not reused future invocations of latest version.
Comments
Post a Comment