root/trunk/pysmsqueue/pysmsqueue

Revision 24, 18.0 kB (checked in by tommi, 2 months ago)

Changed indentation on the createdb function. It doesn't actually
do anything yet but work in progress.

  • Property svn:executable set to *
Line 
1 #! /usr/bin/python
2 # -*- coding: UTF-8 -*-
3
4 import pprint
5 import MySQLdb
6 import thread
7 import sys
8 import time
9 import getopt
10 import os
11 import signal
12 import syslog
13 import Queue
14 import string
15 import popen2
16 from socket import gethostname
17
18 # Configure your database settings
19 dbhost = "localhost"
20 dbname = "smsq"
21 dbuser = "smsqagent"
22 dbpass = "rWeM42%rK"
23
24 # Gnokii command
25 gnokii_command = "./fake.sh"
26 # How long to wait for gnokii before failing the phone
27 gnokii_timeout = 60
28
29 # Set the hostname, will try to figure out else
30 hostname = ""
31
32 version = "0.1.4"
33 # Get the hostname
34 if (hostname == ""):
35         hostname = gethostname()
36
37 # Exit with no hostname
38 if (hostname == ""):
39         print >> stderr, "Unable to determine hostname"
40         sys.exit(1)
41
42 phone_states = ( 'idle', 'sending', 'failed' );
43
44
45 def main():
46         # Work with options specified
47         try:
48                 opts, args = getopt.getopt(sys.argv[1:], "FDhvrlem:R:n:c:M:", ["foreground", "list-phones", "delete-phone=", "daemon", "help", "verbose", "register", "name=", "phone-config=",
49                 "max-length=", "debug", "enqueue", "recipient", "message"])
50         except getopt.GetoptError:
51                 # print help information and exit:
52                 usage()
53                 sys.exit(2)
54         # No options
55         if len(sys.argv) < 2:
56                 usage()
57                 sys.exit(2)
58
59         global dbh
60
61         # Set default values
62         enqueue_message = False
63         message = None
64         list_phones = False
65         daemon = False
66         foreground = False
67         background = True
68         verbose = 0
69         register = False
70         register_phone_name = None
71         register_phone_config = None
72         register_phone_max_length = 160
73         delete_phone = None
74         daemonized = False
75
76
77         # Loop through arguments
78         for o, a in opts:
79                 if o in ("--setupdb"):
80                         pass
81                 if o in ("--debug"):
82                         verbose = 99
83                 if o in ("-D", "--daemon"):
84                         daemon = True
85                 if o in ("-F", "--foreground"):
86                         foreground = True
87                 if o in ("-h", "--help"):
88                         usage()
89                         sys.exit(0)
90                 if o in ("-v", "--verbose"):
91                         verbose += 1
92                         background = False
93                 if o in ("-r", "--register"):
94                         register = True
95                 if o in ("-n", "--name"):
96                         register_phone_name = a
97                 if o in ("-c", "--phone-config"):
98                         register_phone_config = a
99                 if o in ("-l", "--list-phones"):
100                         list_phones = True
101                 if o in ("-M", "--max-length"):
102                         register_phone_max_length = int(a)
103                 if o in ("--delete-phone"):
104                         delete_phone = a
105                 if o in ("-e", "--enqueue"):
106                         enqueue_message = True
107                 if o in ("-m", "--message"):
108                         message = a
109                 if o in ("-R", "--recipient"):
110                         recipient = a
111
112         # Intitialize database connection
113         dbh = db()
114
115
116         # Create phone objects from DB
117         phones = dbh.getPhones()
118
119         # No valid arguments passes
120         if delete_phone == None and list_phones == False and register == False and daemon == False and foreground == False and enqueue_message == False:
121                 usage()
122                 sys.exit(2)
123
124         # Start logobj
125         global logobj
126         logobj = logger(foreground, verbose, False)
127
128
129         # Enqueue message
130         if enqueue_message:
131                 logobj.verbosity = 3
132                 if (not recipient):
133                         usage()
134                         sys.exit(2)
135                 if message == "-" or not message:
136                         message = readstdin()
137                 if dbh.queue_message(recipient, message):
138                         sys.exit(2)
139                 sys.exit(0)
140
141         # Delete phone
142         if delete_phone:
143                 if list_phones or daemon or register or foreground:
144                         usage()
145                         sys.exit(2)
146                 for phone in phones:
147                         if str.lower(phone.phonename) == str.lower(delete_phone):
148                                 dbh.deletePhone(phone.phonename)
149                                 sys.exit(0)
150                 print >> stderr, "Phone %s not found" % (delete_phone)
151                 sys.exit(2)
152                                
153         # List phones
154         if list_phones == True:
155                 if daemon or foreground or register or delete_phone:
156                         usage()
157                         sys.exit(2)
158                 print """%-32s %-16s %-24s %-10s %s
159 -----------------------------------------------------------------------------------------------------""" % \
160                         ("Hostname", "Name", "Phone Config File", "State", "SMS Max length")
161                 for phone in phones:
162                         print "%-32s %-16s %-24s %-10s %i" % (phone.phone_host, phone.phonename, phone.config, phone_states[phone.state], phone.maxlength)
163                 sys.exit(0)
164
165         # Phone registration
166         if register == True:
167                 # Incompatible arguments
168                 if list_phones or daemon or foreground or delete_phone:
169                         usage()
170                         sys.exit(2)
171                 if register_phone_name == None:
172                         print >> sys.stderr, "Register defined but not phone name"
173                         sys.exit(2)
174                 if register_phone_config == None:
175                         print >> sys.stderr, "Warning, no phone config defined, defaulting to /etc/gnokiirc"
176                         register_phone_config = "/etc/gnokiirc"
177                 for phone in phones:
178                         if phone.phone_host != hostname:
179                                 pass
180                         elif phone.phonename == register_phone_name:
181                                 print >> sys.stderr, "A phone with the name %s is already registered for this host" % (phone.phonename)
182                                 sys.exit(2)
183                         elif phone.config == register_phone_config:
184                                 print >> sys.stderr, "Phone %s already defined with config file %s" % (phone.phonename, phone.config)
185                                 sys.exit(2)
186
187                 # Phone config exists
188                 try:
189                         os.stat(register_phone_config)
190                 except OSError, e:
191                         print >> sys.stderr, "Phone config not found (%s): %s" % (register_phone_config, e.strerror)
192                         sys.exit(1)
193
194
195                 dbh.registerPhone(register_phone_name, register_phone_config, register_phone_max_length)
196                 print "Phone registered:\n\tName: %s\n\tConfig: %s\n\tMaxlength: %i\n\tHost: %s" % (
197                         register_phone_name, register_phone_config, int(register_phone_max_length), hostname)
198                 sys.exit(0)
199
200         # Daemonize
201         if daemon == True or foreground == True:
202                 if (len(phones) < 1) :
203                         print >> sys.stderr, "No phones defined, add one with -r"
204                         sys.exit(1)
205
206
207         # Chech for duplicate phone configs on this host
208         phone_configs = {}
209         for phone in phones:
210                 if phone.phone_host != hostname:
211                         pass
212                 if phone.config in phone_configs:
213                         print >> sys.stderr, "Duplicate config files found for phone.%s and %s" % (phone.phonename, phone_configs[phone.config])
214                         sys.exit(3)
215                 else:
216                         phone_configs[phone.config] = phone.phonename
217
218
219         # Background unless told not to
220         if not foreground:
221                 try:
222                         if os.fork() > 0:
223                                 sys.exit(0)
224                 except OSError, e:
225                         print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
226                         sys.exit(1)
227
228                 daemonized = True
229                 syslog.openlog(sys.argv[0], 0, syslog.LOG_DAEMON)
230
231                 try:
232                         os.close(sys.stdin.fileno())
233                         os.close(sys.stdout.fileno())
234                         os.close(sys.stderr.fileno())
235                         os.setsid()
236                 except OSError, e:
237                         print >>sys.stderr, "fork failed: %d (%s)" % (e.errno, e.strerror)
238                         sys.exit(1)
239        
240
241                 try:
242                         if os.fork() > 0:
243                                 sys.exit(0)
244                 except OSError, e:
245                         syslog.syslog(syslog.LOG_ERR, "could not daemonize: %s [%i]" % (e.strerror, e.errno))
246                         sys.exit(1)
247
248         # Start logobj after forking
249         logobj = logger(foreground, verbose, True)
250
251         # Reinit database connection
252         dbh = db()
253         logobj.log("started succesfully, v%s" % (version), syslog.LOG_NOTICE)
254
255         # Clear failed/in use states
256         clearPhoneStates(phones)
257
258         # Create message queue for thread intercommunications
259         mqueue = Queue.Queue()
260
261         # The main loop
262         while True:
263                 mq_message = ""
264                 try:
265                         mq_message = mqueue.get_nowait()
266                 except:
267                         pass
268
269                 # Have message from thread
270                 if mq_message != "":
271                         (phid, mid, mnum, ret) = string.split(mq_message, ";", 3)
272                         logobj.log("mq_message, phoneid=%s, messageid=%s, recipientnumber=%s, ret=%s" % (phid, mid, mnum, ret), syslog.LOG_DEBUG)
273                         newstate = 0
274                         if int(ret) != 0:
275                                 newstate = 2
276                                 # Re-queue message
277                                 dbh.requeue_message(mid)
278                         else:
279                                 dbh.update_message_state(mid, 'sent')
280                                
281                         for phone in phones:
282                                 if phone.phoneid == int(phid):
283                                         if newstate != 0:
284                                                 logobj.log("Phone %s on %s unable to send message to %s, marking phone as failed" % (phone.phonename, phone.phone_host, mnum), syslog.LOG_ERR)
285                                         phone.state = newstate
286                                         dbh.updatePhoneState(phone.phoneid, phone_states[newstate])
287
288                 failed_phones = 0
289                 local_phones = 0
290                 for phone in phones:
291                         if phone.phone_host != hostname:
292                                 continue
293                         local_phones += 1
294                         if phone_states[phone.state] == 'failed':
295                                 failed_phones += 1
296                         elif phone_states[phone.state] == 'idle':
297                                 message = dbh.get_message_locked(phone.phoneid)
298                                 if message != None:
299                                         logobj.log("Message found, starting thread to send, phone=%s number=%s" % (phone.phonename, message['number']), syslog.LOG_DEBUG)
300                                         phone.state = 1
301                                         dbh.updatePhoneState(phone.phoneid, phone_states[1])
302                                         logobj.log("Sending message to %s on phone %s on host %s" % (message['number'], phone.phonename, phone.phone_host), syslog.LOG_INFO)
303                                         thread.start_new_thread(runSender, (phone, message, mqueue))
304                                 else:
305                                         break
306
307                 if failed_phones == local_phones:
308                         logobj.log("All phones have failed, exiting", syslog.LOG_ERR)
309                         sys.exit(2)
310                 time.sleep(1)
311                 logobj.log("main: Main Loop", syslog.LOG_DEBUG)
312
313
314 def readstdin():
315         """Read message from standard input and return it"""
316         message = ""
317         while 1:
318                 line = sys.stdin.readline()
319                 if line == "":
320                         break
321                 message = message + line
322         while message[-1] == '\n':
323                 message = message.rstrip("\n")
324         return message
325        
326 def clearPhoneStates(phones):
327         """Reset all phone states to idle"""
328         for phone in phones:
329                 phone.state = 0
330                 dbh.updatePhoneState(phone.phoneid, phone_states[0])
331
332 class logger:
333         def __init__(self, foreground, verbosity, to_syslog):
334                 """Initialize logger variables and open syslog"""
335                 self.foreground = foreground
336                 self.verbosity = verbosity
337                 self.syslog_active = to_syslog
338                 if foreground == False and self.syslog_active == True:
339                         syslog.openlog(sys.argv[0], 0, syslog.LOG_DAEMON)
340                        
341         def log(self, message, level = 0):
342                 """Send to syslog or stdout"""
343                 if (self.syslog_active == False or self.foreground) and self.verbosity >= level:
344                         print message
345                 elif self.syslog_active == True:
346                         syslog.syslog(level, message)
347
348
349 def runSender(phone, message, mqueue):
350         """Runs the actual gnokii process"""
351         try:
352                 pid = os.fork()
353         except OSError, e:
354                 # Todo deal with fork errors
355                 pass
356                
357         tries = 0
358         if (pid):
359                 ret = 0
360                 retpid = 0
361                 # Loop while waiting for gnokii
362                 while (tries < gnokii_timeout):
363                         logobj.log("runSender reap loop", syslog.LOG_DEBUG)
364                         time.sleep(1)
365                         tries += 1
366                         # Reap the child
367                         (retpid, sts) = os.waitpid(pid, os.WNOHANG)
368                         if retpid and os.WIFEXITED(sts):
369                                 break
370
371                 # Process ran too long, murder and re-queue message.
372                 if retpid == 0:
373                         os.kill(pid, signal.SIGTERM)
374                         mqueue.put("%s;%s;%s;%i" % (phone.phoneid, message['id'], message['number'], 255))
375                 # The exit status was non-ZERO
376                 elif os.WEXITSTATUS(sts) != 0:
377                         mqueue.put("%s;%s;%s;%i" % (phone.phoneid, message['id'], message['number'], os.WEXITSTATUS(sts)))
378                 #
379                 else:
380                         mqueue.put("%s;%s;%s;%i" % (phone.phoneid, message['id'], message['number'], 0))
381         else:
382                 # Gnokii pid
383                 pid = 0
384
385                 # Signal handler to kill child
386                 def killpipe(a, b):
387                         if (pid > 0):
388                                 os.kill(pid, signal.SIGKILL)
389                                 os._exit(1)
390
391                 # Run the piped command to gnokii
392                 try:
393                         pipe = popen2.Popen4("%s --config %s --sendsms %s -r -l %i" % (gnokii_command, phone.config, message['number'], phone.maxlength))
394                         pid = pipe.pid
395                 except OSError, e:
396                         print >> sys.stderr, "popen failed: %d (%s)" % (e.errno, e.strerror)
397                         os._exit(2)
398                
399                 # Register the signal handler
400                 signal.signal(signal.SIGTERM, killpipe)
401                 # Write the message contents to the child
402                 pipe.tochild.write(message["body"])
403                 # Send a EOF to the child
404                 pipe.tochild.close()
405                 # Wait for gnokii to return, will be killed if a sigterm is received
406                 try:
407                         ret = pipe.wait()
408                 # Got killed by parent, timeout
409                 except:
410                         ret = 1
411                 # Weird, sometimes return code is over 255
412                 if (ret > 255):
413                         ret -= 255
414                 os._exit(ret)
415
416 def usage():
417         """Print usage to stdout"""
418         print "Usage " + sys.argv[0] + """ arguments.. 
419
420   General Options
421         -v, --verbose           Be verbose, multiple verbose flags increases verbosity
422         -h, --help              Show help (this bit)
423         --debug                 Run in debug mode, verbose everything
424         -D, --daemon            Run as a daemon
425         -F, --foreground        Run in foreground
426        
427   Messaging
428         -q, --list-queue        List messages in queue
429         -e, --enqueue
430                 -R, --recipient Recipient phone number
431                 -m, --message   Message, - for stdin
432
433   Phone Management
434         -l, --list-phones       List phones
435         --delete-phone=<name>   Delete a phone
436         -r, --register          Register new phone
437                 -n, --name              Phone name (required)
438                 -c, --phone-config      Path to phone config (default /etc/gnokiirc)
439                 -M, --max-length        Maximum characters for sending messages (default 160)
440 """
441
442 class phone:
443         """The phone class keeps info about each phone"""
444         def __init__(self, phonename, id, phone_host, maxlength = 160, config = "/etc/gnokiirc", state = 0):
445                 """Initialize phone object with defaults"""
446                 self.config = config
447                 self.phone_host = phone_host
448                 self.phonename = phonename
449                 self.phoneid = id
450                 self.state = state
451                 self.maxlength = maxlength
452                 # Threading lock
453                 phone_lock = thread.allocate_lock()
454         def sendsms(self, number, message, maxlength = 160):
455                 print "Number: %s\nMessage\n\t%s" % (number, message)
456         def resetFailed(self):
457                 pass
458
459
460 class db:
461         def __init__(self):
462                 """Connect to the database"""
463                 try:
464                         dbh = MySQLdb.connect( host=dbhost, user=dbuser, passwd=dbpass, db=dbname )
465                 except MySQLdb.Error, e:
466                         logobj.log("DB Connect Error %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
467                         sys.exit(2)
468
469                 try:
470                         self.dbc = dbh.cursor()
471                 except MySQLdb.Error, e:
472                         logobj.log("DB Cursor Error %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
473                         sys.exit(2)
474
475         def requeue_message(self, message_id):
476                 """Requeue a message and remove the phone id"""
477                 logobj.log("Re-queueing message with message_id = %s" % (message_id), syslog.LOG_NOTICE)
478                 self.lock_message_queue()
479                 try:
480                         self.dbc.execute("""UPDATE messagequeue SET phone_id = 0, state = 'queued' WHERE id = %s""", (message_id))
481                 except MySQLdb.Error, e:
482                         logobj.log("DB Error Updating %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
483                         sys.exit(2)
484
485                 self.unlock_tables()
486
487         def queue_message(self, recipient, message):
488                 """Queue a message"""
489                 logobj.log("Queueing message to %s" % (recipient), syslog.LOG_NOTICE)
490                 self.lock_message_queue()
491                 try:
492                         self.dbc.execute("""INSERT INTO messagequeue (number, message, hostname) VALUES (%s, %s, %s)""", (recipient, message, hostname))
493                 except MySQLdb.Error, e:
494                         logobj.log("DB Error Inserting %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
495                         sys.exit(2)
496                 self.unlock_tables()
497                
498         def update_message_state(self, message_id, state):
499                 """Update the message state"""
500                 self.lock_message_queue()
501                 try:
502                         self.dbc.execute("""UPDATE messagequeue SET state = %s, sent = CURRENT_TIMESTAMP WHERE id = %s""", (state, message_id))
503                 except MySQLdb.Error, e:
504                         logobj.log("DB Error Updating %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
505                         sys.exit(2)
506
507                 self.unlock_tables()
508                
509         def get_message_locked(self, phone):
510                 """Get the oldest unsent message from the queue and set the assosiacted phone_id for the phone"""
511                 logobj.log("get_message_locked: Getting messages", syslog.LOG_DEBUG)
512                 self.lock_message_queue()
513                 try:
514                         self.dbc.execute("SELECT id, number, message FROM messagequeue WHERE phone_id = 0 ORDER BY requested")
515                 except MySQLdb.Error, e:
516                         logobj.log("DB Error Selecting %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
517                         sys.exit(2)
518
519                
520                 if self.dbc.rowcount == 0:
521                         self.unlock_tables()
522                         return None
523
524                 logobj.log("get_message_locked: %i messages waiting in queue, grabbing one" % self.dbc.rowcount, syslog.LOG_DEBUG)
525                 row = self.dbc.fetchone()
526                 message = { "id" : row[0], "number" : row[1], "body" : row[2] }
527                 try:
528                         self.dbc.execute("""UPDATE messagequeue SET phone_id = %s, state = 'intransit' WHERE id = %s""", (phone, message['id']))
529                 except MySQLdb.Error, e:
530                         logobj.log("DB Error %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
531                         sys.exit(2)
532
533                 self.unlock_tables()
534                 return message
535
536         def unlock_tables(self):
537                 """Unlock all table locks"""
538                 try:
539                         self.dbc.execute("UNLOCK TABLES")
540                 except MySQLdb.Error, e:
541                         logobj.log("DB Error Unlocking %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
542                         sys.exit(2)
543
544         def lock_message_queue(self):
545                 try:
546                         self.dbc.execute("LOCK TABLES messagequeue WRITE")
547                 except MySQLdb.Error, e:
548                         logobj.log("DB Error Locking %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
549                         sys.exit(2)
550        
551         def getPhones(self):
552                 try:
553                         self.dbc.execute("""SELECT phone_host, id, phonename, config, maxlength, state FROM phones""")
554                 except MySQLdb.Error, e:
555                         logobj.log("DB Error Selecting %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
556                         sys.exit(2)
557                 phones = []
558                 i = 0
559                 data = {}
560                 while i < self.dbc.rowcount:
561                         row = self.dbc.fetchone()
562                         ii = 0
563                         phoneinfo = {}
564                         while ii < len(self.dbc.description):
565                                 if row[ii] != None:
566                                         phoneinfo[self.dbc.description[ii][0]] = row[ii]
567                                 ii += 1
568                         i += 1
569                         p = phone(**phoneinfo)
570                         phones.append(  p  )
571                 return phones
572         def registerPhone(self, phone_name, phone_config = "/etc/gnokiirc", maxlength = 160):
573                 try:
574                         maxlength = int(maxlength)     
575                         self.dbc.execute("""
576                                 INSERT INTO phones
577                                 (
578                                         phonename,
579                                         config,
580                                         maxlength,
581                                         phone_host
582                                 )
583                                 VALUES
584                                 (
585                                         %s,
586                                         %s,
587                                         %s,
588                                         %s
589                                 )""",
590                                 ( phone_name, phone_config, maxlength, hostname ))
591                 except MySQLdb.Error, e:
592                         logobj.log("DB Error Inserting %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
593                         sys.exit(2)
594                 return 0
595         def deletePhone(self, phone_name):
596                 try:
597                         self.dbc.execute("""DELETE FROM phones WHERE phonename = %s""", ( phone_name ))
598                 except MySQLdb.Error, e:
599                         logobj.log("DB Error %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
600                         sys.exit(2)
601         def updatePhoneState(self, id, state):
602                 try:
603                         self.dbc.execute("""UPDATE phones SET state = %s WHERE id = %s""", (state, id))
604                 except MySQLdb.Error, e:
605                         logobj.log("DB Error %d:\n%s" % (e[0], e[1]), syslog.LOG_ERR)
606                         sys.exit(2)
607
608         def createDB(self, dbuser, dbpass):
609                 """
610 CREATE TABLE phones (
611         id              int             primary key auto_increment,
612         phonename       char(128)       not null,
613         phone_host      char(128)       not null,
614         config          char(128)       not null,
615         maxlength       int             not null default 160,
616         state           enum('idle', 'sending', 'failed')               not null default 'idle'
617         );
618
619 CREATE TABLE messagequeue (
620         id              int             primary key auto_increment,
621         number          char(32)        not null,
622         message         text            not null,
623         requested       TIMESTAMP       not null default CURRENT_TIMESTAMP,
624         hostname        char(128)       not null,
625         phone_id        int             default 0,
626         state           enum('queued', 'intransit', 'sent') not null default 'queued',
627         sent            TIMESTAMP
628         );
629                 """
630
631 if __name__ == "__main__":
632     main()
633
Note: See TracBrowser for help on using the browser.