AMQP? Maybe not yet. At least not with Python.
In my last Python project I thought about using AMQP messaging. A quick look at Python AMQP libraries shows two possibilities:
- Apache Qpid python client
- Barry’s py-amqplib
Apache Qpid
I started my adventure by downloading the source. They don’t have binary packages so this is what’s needed:
$ wget http://www.apache.org/dist/incubator/qpid/M3-incubating/qpid-incubating-M3.tar.gzHmm. What’s next... No README files... After some googling I found that there’s getting stated guide. Following it this should be the magic command:
$ tar xvzf qpid-incubating-M3.tar.gz
$ cd qpid-incubating-M3/
$ cd java/brokerOkay, let’s look for the missing file somewhere:
$ PATH=$PATH:bin QPID_HOME=$PWD ./bin/qpid-server -c etc/persistent_config.xml
./bin/qpid-server: line 37: qpid-run: No such file or directory
$ find ../.. -name qpid-runNext try:
../../java/common/bin/qpid-run
$ cp ../../java/common/bin/qpid-run bin
$ PATH=$PATH:bin QPID_HOME=$PWD ./bin/qpid-server -c etc/persistent_config.xmlThat’s enough. I’m definitely too stupid to run this server and too busy to waste more time here (there is also other reason why I’m not really interested in qpid, read on).
Setting QPID_WORK to /home/majek as default
System Properties set to -Damqj.logging.level=info -DQPID_HOME=/home/majek/b/qpid-incubating-M3/java/broker -DQPID_WORK=/home/majek
Using QPID_CLASSPATH /home/majek/b/qpid-incubating-M3/java/broker/lib/qpid-incubating.jar:/home/majek/b/qpid-incubating-M3/java/broker/lib/bdbstore-launch.jar
Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC -XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError
Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM -Xmx1024m
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/qpid/server/Main
Caused by: java.lang.ClassNotFoundException: org.apache.qpid.server.Main
py-amqplib + RabbitMQ
Here installation went smoothly.
$ python easy_install amqplibThe broker:
$ wget http://www.rabbitmq.com/releases/rabbitmq-server/v1.4.0/rabbitmq-server_1.4.0-1_all.deb
$ sudo dpkg -i rabbitmq-server_1.4.0-1_all.deb
The good
I also downloaded the py-amqlib sources and found out that the examples are working out of the box:
~/amqplib-0.5/demo$ ./demo_send.py 1Most common way of using Barry’s library is to write software that only consumes data from AMQP. Use one event loop that blocks forever and when something happens on AMQP the callback is executed. I have slightly different use case. In the project I’m writing I use asynchronous programming. This means that I have one event loop that is not the AMQP event loop. I want to run AMQP stack every time something happened on a socket. This is how it would look like in pseudocode:
~/amqplib-0.5/demo$ ./demo_receive.py
while True:Qpid python client has only blocking interface, so it’s impossible to write code like that. Fortunately there’s a nonblocking client in Barry’s library. There’s even an example:
nonblocking_consume_amqp_events()
<magically_wait_for_activity_on_amqp_socket>
./nbdemo_receive.py
The bad
The demo doesn’t work correctly when there are more messages in the queue and fails not very nicely with this traceback:
Traceback (most recent call last):
[...]
File "build/bdist.linux-i686/egg/amqplib/nbclient_0_8.py", line 74, in write
P' read_buf='&!amq.ctag-FC3ET7kTcqy/A93gJYQWqw==6!amq.ctag-FC3ET7kTcqy/A93gJYQWqw==myfan2
Above bug is triggered by basic_ack. So I removed that command - I stopped sending message acknowledges. Now the order of delivering messages got broken:
['1', '1']If the problem is with acknowledges, then one could say to use no_ack on a channel and just disable acknowledges. Sorry, this also doesn’t work.
['2', '1', '2']
['3', '1', '2', '3']
['4', '1', '2', '3', '4']
I send ten messages:While I was working at this issue I was worried that my Python process is eating more and more memory. After some debugging I discovered that amqplib has very nice memory leaks (okay, reference cycles).
['a_0', 'a_1', 'a_2', 'a_3', 'a_4', 'a_5', 'a_6', 'a_7', 'a_8', 'a_9']
Then another ten:
['b_0', 'b_1', 'b_2', 'b_3', 'b_4', 'b_5', 'b_6', 'b_7', 'b_8', 'b_9']
I receive:
['b_0', 'b_1', 'b_2', 'b_3', 'b_4', 'b_5', 'b_6', 'b_7', 'b_8', 'b_9',
'a_0', 'a_1', 'a_2', 'a_3', 'a_4', 'a_5', 'a_6', 'a_7', 'a_8', 'a_9',
'b_0', 'b_1', 'b_2', 'b_3', 'b_4', 'b_5', 'b_6', 'b_7', 'b_8', 'b_9']
The ugly
After few days I finally managed to create nonblocking code that I needed. But it’s really ugly:
def my_nb_callback(ch):
raise MException
conn = nbamqp.NonBlockingConnection('localhost',
userid='guest', password='guest',
nb_callback=my_nb_callback, nb_sleep=0.0)
ch = conn.channel()
ch.access_request('/data', active=True, read=True)
ch.exchange_declare('myfans', 'fanout', auto_delete=True)
qname, _, _ = ch.queue_declare()
ch.queue_bind(qname, 'myfans')
msgs = []
def callback(msg):
msgs.append( msg )
ch.connection.sock.sock.setblocking(False)
ch.basic_consume(qname, callback=callback)
while True:
msgs = []
<magically_wait_for_data_on_ch.connection.sock.sock>
try:
nbamqp.nbloop([ch])
except MException:
pass
unique_msgs_filter = {}
unique_msgs = []
for msg in msgs:
msg.channel.basic_ack(msg.delivery_tag)
if msg.body not in unique_msgs_filter:
unique_msgs_filter[msg.body] = True
unique_msgs.append(msg.body)
print '%r ' % (unique_msgs)
Back to RabbitMQ
For this simple research Rabbit works perfectly just out of the box. For a moment I even forgot that python library is not all of the software involved. But when I wanted to see some more details about it, things became messy:
$ rabbitmqctl --helpWhich password would you like? Even if I know that, I’m not going to give any passwords away to see the help message, sorry. Next try:
Password:
$ rabbitmq-server --help
/usr/sbin/rabbitmq-server: 44: cannot create /var/log/rabbitmq/rabbit.log.bak: Permission denied
/usr/sbin/rabbitmq-server: 45: cannot create /var/log/rabbitmq/rabbit-sasl.log.bak: Permission denied
{error_logger,{{2008,11,21},{0,26,12}},"Protocol: ~p: register/listen error: ~p~n",["inet_tcp",einval]}
{error_logger,{{2008,11,21},{0,26,12}},crash_report,[[{pid,<0.22.0>},{registered_name,net_kernel},{error_info,{error,badarg}},{initial_call,{gen,init_it,[gen_server,<0.19.0>,<0.19.0>,{local,net_kernel},net_kernel,{rabbit,shortnames,15000},[]]}},{ancestors,[net_sup,kernel_sup,<0.9.0>]},{messages,[]},{links,[<0.19.0>]},{dictionary,[{longnames,false}]},{trap_exit,true},{status,running},{heap_size,377},{stack_size,21},{reductions,309}],[]]}
Conclusion
Even if the AMQP protocol is great, AMQP user stack still needs a lot of polish in my opinion.
Update #1:
rabbitmq-discuss thread