AMQP? Maybe not yet. At least not with Python.
In my last Python project I thought about using AMQP messaging. A quick look at Python AMQP libraries shows two possibilities:
- Apache Qpid python client
- Barry’s py-amqplib
Apache Qpid
I started my adventure by downloading the source. They don’t have binary packages so this is what’s needed:
$ wget http://www.apache.org/dist/incubator/qpid/M3-incubating/qpid-incubating-M3.tar.gzHmm. What’s next... No README files... After some googling I found that there’s getting stated guide. Following it this should be the magic command:
$ tar xvzf qpid-incubating-M3.tar.gz
$ cd qpid-incubating-M3/
$ cd java/brokerOkay, let’s look for the missing file somewhere:
$ PATH=$PATH:bin QPID_HOME=$PWD ./bin/qpid-server -c etc/persistent_config.xml
./bin/qpid-server: line 37: qpid-run: No such file or directory
$ find ../.. -name qpid-runNext try:
../../java/common/bin/qpid-run
$ cp ../../java/common/bin/qpid-run bin
$ PATH=$PATH:bin QPID_HOME=$PWD ./bin/qpid-server -c etc/persistent_config.xmlThat’s enough. I’m definitely too stupid to run this server and too busy to waste more time here (there is also other reason why I’m not really interested in qpid, read on).
Setting QPID_WORK to /home/majek as default
System Properties set to -Damqj.logging.level=info -DQPID_HOME=/home/majek/b/qpid-incubating-M3/java/broker -DQPID_WORK=/home/majek
Using QPID_CLASSPATH /home/majek/b/qpid-incubating-M3/java/broker/lib/qpid-incubating.jar:/home/majek/b/qpid-incubating-M3/java/broker/lib/bdbstore-launch.jar
Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC -XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError
Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM -Xmx1024m
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/qpid/server/Main
Caused by: java.lang.ClassNotFoundException: org.apache.qpid.server.Main
py-amqplib + RabbitMQ
Here installation went smoothly.
$ python easy_install amqplibThe broker:
$ wget http://www.rabbitmq.com/releases/rabbitmq-server/v1.4.0/rabbitmq-server_1.4.0-1_all.deb
$ sudo dpkg -i rabbitmq-server_1.4.0-1_all.deb
The good
I also downloaded the py-amqlib sources and found out that the examples are working out of the box:
~/amqplib-0.5/demo$ ./demo_send.py 1Most common way of using Barry’s library is to write software that only consumes data from AMQP. Use one event loop that blocks forever and when something happens on AMQP the callback is executed. I have slightly different use case. In the project I’m writing I use asynchronous programming. This means that I have one event loop that is not the AMQP event loop. I want to run AMQP stack every time something happened on a socket. This is how it would look like in pseudocode:
~/amqplib-0.5/demo$ ./demo_receive.py
while True:Qpid python client has only blocking interface, so it’s impossible to write code like that. Fortunately there’s a nonblocking client in Barry’s library. There’s even an example:
nonblocking_consume_amqp_events()
<magically_wait_for_activity_on_amqp_socket>
./nbdemo_receive.py
The bad
The demo doesn’t work correctly when there are more messages in the queue and fails not very nicely with this traceback:
Traceback (most recent call last):
[...]
File "build/bdist.linux-i686/egg/amqplib/nbclient_0_8.py", line 74, in write
P' read_buf='&!amq.ctag-FC3ET7kTcqy/A93gJYQWqw==6!amq.ctag-FC3ET7kTcqy/A93gJYQWqw==myfan2
Above bug is triggered by basic_ack. So I removed that command - I stopped sending message acknowledges. Now the order of delivering messages got broken:
['1', '1']If the problem is with acknowledges, then one could say to use no_ack on a channel and just disable acknowledges. Sorry, this also doesn’t work.
['2', '1', '2']
['3', '1', '2', '3']
['4', '1', '2', '3', '4']
I send ten messages:While I was working at this issue I was worried that my Python process is eating more and more memory. After some debugging I discovered that amqplib has very nice memory leaks (okay, reference cycles).
['a_0', 'a_1', 'a_2', 'a_3', 'a_4', 'a_5', 'a_6', 'a_7', 'a_8', 'a_9']
Then another ten:
['b_0', 'b_1', 'b_2', 'b_3', 'b_4', 'b_5', 'b_6', 'b_7', 'b_8', 'b_9']
I receive:
['b_0', 'b_1', 'b_2', 'b_3', 'b_4', 'b_5', 'b_6', 'b_7', 'b_8', 'b_9',
'a_0', 'a_1', 'a_2', 'a_3', 'a_4', 'a_5', 'a_6', 'a_7', 'a_8', 'a_9',
'b_0', 'b_1', 'b_2', 'b_3', 'b_4', 'b_5', 'b_6', 'b_7', 'b_8', 'b_9']
The ugly
After few days I finally managed to create nonblocking code that I needed. But it’s really ugly:
def my_nb_callback(ch):
raise MException
conn = nbamqp.NonBlockingConnection('localhost',
userid='guest', password='guest',
nb_callback=my_nb_callback, nb_sleep=0.0)
ch = conn.channel()
ch.access_request('/data', active=True, read=True)
ch.exchange_declare('myfans', 'fanout', auto_delete=True)
qname, _, _ = ch.queue_declare()
ch.queue_bind(qname, 'myfans')
msgs = []
def callback(msg):
msgs.append( msg )
ch.connection.sock.sock.setblocking(False)
ch.basic_consume(qname, callback=callback)
while True:
msgs = []
<magically_wait_for_data_on_ch.connection.sock.sock>
try:
nbamqp.nbloop([ch])
except MException:
pass
unique_msgs_filter = {}
unique_msgs = []
for msg in msgs:
msg.channel.basic_ack(msg.delivery_tag)
if msg.body not in unique_msgs_filter:
unique_msgs_filter[msg.body] = True
unique_msgs.append(msg.body)
print '%r ' % (unique_msgs)
Back to RabbitMQ
For this simple research Rabbit works perfectly just out of the box. For a moment I even forgot that python library is not all of the software involved. But when I wanted to see some more details about it, things became messy:
$ rabbitmqctl --helpWhich password would you like? Even if I know that, I’m not going to give any passwords away to see the help message, sorry. Next try:
Password:
$ rabbitmq-server --help
/usr/sbin/rabbitmq-server: 44: cannot create /var/log/rabbitmq/rabbit.log.bak: Permission denied
/usr/sbin/rabbitmq-server: 45: cannot create /var/log/rabbitmq/rabbit-sasl.log.bak: Permission denied
{error_logger,{{2008,11,21},{0,26,12}},"Protocol: ~p: register/listen error: ~p~n",["inet_tcp",einval]}
{error_logger,{{2008,11,21},{0,26,12}},crash_report,[[{pid,<0.22.0>},{registered_name,net_kernel},{error_info,{error,badarg}},{initial_call,{gen,init_it,[gen_server,<0.19.0>,<0.19.0>,{local,net_kernel},net_kernel,{rabbit,shortnames,15000},[]]}},{ancestors,[net_sup,kernel_sup,<0.9.0>]},{messages,[]},{links,[<0.19.0>]},{dictionary,[{longnames,false}]},{trap_exit,true},{status,running},{heap_size,377},{stack_size,21},{reductions,309}],[]]}
Conclusion
Even if the AMQP protocol is great, AMQP user stack still needs a lot of polish in my opinion.
Update #1:
rabbitmq-discuss thread
4 comments:
there's also txAMQP - protocol implementation for twisted (https://code.launchpad.net/txamqp)
um... just start rabbitmqctl as root
and it won't prompt for a pw, and will print a helpful usage message.
manually starting a daemon that is already running, will cause the error
messages you showed. nothing wrong there.
the py-amqlib is explicitly single threaded. If that's not what you want, then you're using the wrong client.
multi-threaded event driven, you probably want something like the twisted client mentioned in previous post.
> um... just start rabbitmqctl as root
> manually starting a daemon that is
> already running, will cause the
> error messages you showed. nothing
> wrong there.
That's not the point. I don't say that software is buggy. I just think that mature software has a bit more meaningful error messages.
> the py-amqlib is explicitly
> single threaded. If that's not
> what you want, then you're using
> the wrong client.
I just think that libraries in general should allow you to use them for different contexts. That's the point of creating library - to make easy to use *reusable* code.
Creating a library that's tied to your use case leads to extending problems. Py-amqp is fighting with it right now. The nonblockingclient, which in my opion is (was?) buggy is trying to extend core library, and it's not very easy if the library is designed with only one usage in mind.
> multi-threaded event driven, you
> probably want something like the
> twisted client mentioned in
> previous post.
Thanks for bringing this library to my attention. I'll definitely take a look.
for QPID, try downloading the actual Java Broker release and not the full release. There seems to be a problem with package contents.
Post a Comment