diff options
Diffstat (limited to 'eventStreamer/eventStreamer.py')
| -rwxr-xr-x | eventStreamer/eventStreamer.py | 92 |
1 files changed, 72 insertions, 20 deletions
diff --git a/eventStreamer/eventStreamer.py b/eventStreamer/eventStreamer.py index 9f062e0..c6f9f1f 100755 --- a/eventStreamer/eventStreamer.py +++ b/eventStreamer/eventStreamer.py @@ -14,6 +14,13 @@ parser.add_option("-a", "--accelerate", type="float", dest="acc", default=1.0,he def sqldatetime(t): return string.replace(t.isoformat(),"T"," ") +outsock = socket.socket( socket.AF_INET,socket.SOCK_DGRAM ) +devices={} + +def sendevent(e): + print str(e[3]),str(e[1]),devices[e[3]]['IMSI'] + outsock.sendto( devices[e[3]]['IMSI']+","+str(e[1]), (config.viz_ip, config.viz_port) ) + def main(): startTime=None @@ -25,7 +32,7 @@ def main(): if logtime=="": if date=="": - logtime=str(now.hour)+":"+str(now.minute)+":"+str(now.second+now.microsecond) + logtime=str(now.hour)+":"+str(now.minute)+":"+str(now.second)+"."+str(now.microsecond) else: logtime="0:00:00.0" @@ -38,7 +45,7 @@ def main(): cursor = db.cursor() sql = "SELECT * FROM IMSIs" - devices={} + results=None try: cursor.execute(sql) @@ -50,37 +57,82 @@ def main(): d['ts']=row[3] devices[row[0]]=d except: - print "Error: unable to fetch data" + print "Error: unable to fetch handset list" #get 1st event sql = "SELECT * FROM events Where ts Between '"+sqldatetime(startTime)+"' And '"+sqldatetime(now)+"' LIMIT 1" - outsock = socket.socket( socket.AF_INET,socket.SOCK_DGRAM ) - try: cursor.execute(sql) results = cursor.fetchall() - id=long(results[0][0]) + except: + outsock.sendto( "no event data", (config.viz_ip, config.viz_port) ) + print "Error: no event data" + + if len(results) > 0: + #playback mode + id=long(results[0][0])+1 device_id=results[0][3] now=results[0][2] - print str(results[0][3])+" "+str(results[0][1])+" "+devices[results[0][3]]['IMSI'], (config.viz_ip, config.viz_port) - outsock.sendto( str(results[0][3])+" "+str(results[0][1])+" "+devices[results[0][3]]['IMSI'], (config.viz_ip, config.viz_port) ) + print "playback, next id: ",id + outsock.sendto( "openBTSviz: playback mode: starting at: "+sqldatetime(startTime), (config.viz_ip, config.viz_port) ) + sendevent(results[0]) while True: - id+=1 sql = "SELECT * FROM events Where id = "+str(id) - print sql + try: + cursor.execute(sql) + results = cursor.fetchall() + print "waiting ",(((results[0][2]-now).total_seconds()+(results[0][2]-now).microseconds))*options.acc + time.sleep((((results[0][2]-now).total_seconds()+(results[0][2]-now).microseconds))*options.acc) + now=results[0][2] + sendevent(results[0]) + id+=1 + except: + outsock.sendto( "end of data", (config.viz_ip, config.viz_port) ) + print "end of data" + exit(0) + else: + #realtime mode: get id of last event + sql = "SELECT * FROM events ORDER BY id DESC LIMIT 1" + try: cursor.execute(sql) results = cursor.fetchall() - print "waiting ",(((results[0][2]-now).total_seconds()+(results[0][2]-now).microseconds))*options.acc - time.sleep((((results[0][2]-now).total_seconds()+(results[0][2]-now).microseconds))*options.acc) - now=results[0][2] - print str(results[0][3])+" "+str(results[0][1])+" "+devices[results[0][3]]['IMSI'] - outsock.sendto( str(results[0][3])+" "+str(results[0][1])+" "+devices[results[0][3]]['IMSI'], (config.viz_ip, config.viz_port) ) - except: - outsock.sendto( "end of data", (config.viz_ip, config.viz_port) ) - print "Error: unable to fetch data" - - #2 modes of operation depending whether realtime or historical + id=long(results[0][0])+1 + print "realtime, next id: ",id + outsock.sendto( "openBTSviz: realtime mode: next event id: "+str(id), (config.viz_ip, config.viz_port) ) + except: + #empty table + id=1 + while True: + #have to refresh db connection each time + db.close() + db = MySQLdb.connect(config.mysql_ip,config.mysql_user,config.mysql_pword,config.mysql_db) + cursor = db.cursor() + sql = "SELECT * FROM events Where id = "+str(id) + cursor.execute(sql) + results = cursor.fetchall() + if len(results) > 0: + #check if IMSI is new + if not devices.has_key(results[0][3]): + sql = "SELECT * FROM IMSIs Where id = "+str(results[0][3]) + try: + cursor.execute(sql) + imresults = cursor.fetchall() + for row in imresults: + d={} + d['IMSI']=row[1] + d['number']=row[2] + d['ts']=row[3] + devices[row[0]]=d + print "new IMSI: ",d['IMSI'] + except: + print "Error: unable to fetch event handset" + exit() + sendevent(results[0]) + id+=1 + else: + #event doesn't exist: wait and retry + time.sleep(0.05) db.close() |
