# http://code.google.com/p/py-amqplib/downloads/list from amqplib import client_0_8 as amqp import time import os msgcount = 0 spooldir = "/tmp/em-qp/" appname = "slurpee@musuca" tagname = "slurpee!" quename = "public_%s_%s" % (appname, "v5",) try: os.stat(spooldir) except: print "DEB: mkdir(%s)\n" % (spooldir,) os.mkdir(spooldir) conn = amqp.Connection(host="queue.eve-metrics.com:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel() #chan.queue_declare(queue=quename, durable=False, exclusive=True, auto_delete=True) chan.queue_declare(queue=quename, durable=False, exclusive=False, auto_delete=False) chan.queue_bind(queue=quename, exchange="market_order_uploads", routing_key="") def timeout(sig, stack): print "timeout, exiting" exit(23) import signal signal.signal(signal.SIGALRM, timeout) signal.alarm(300) def recv_callback(msg): signal.alarm(300) global msgcount, spooldir msgcount = msgcount + 1 fn = "%.3f.%s.%s.yaml" % (time.time(), os.getpid(), msgcount,) if not len(msg.body): print "DEB: skipping empty" return for k in dir(msg): v = getattr(msg,k) print "EMPTYBODY: '%s' => '%s'" % (k, v,) return print "DEB: picked id '%s' for %i bytes ..." % (fn, len(msg.body),) tn = "%s.%s" % (spooldir,fn,) sn = "%s%s" % (spooldir,fn,) try: os.stat(tn) raise TempNameExists except OSError: pass try: os.stat(sn) raise SpoolNameExists except OSError: pass f = open(tn, "wb") f.write(msg.body) f.close() os.rename(tn, sn) chan.basic_consume(queue=quename, no_ack=True, callback=recv_callback, consumer_tag=tagname) while True: chan.wait() #chan.basic_cancel(tagname)