At MeetMe, we use Cobbler to bootstrap new servers and Chef for configuration management. With a bit of hacking we have Cobbler adding nodes into Chef so they configure upon first boot. Using Cobbler's trigger system we have two triggers that currently maintain nodes in Chef. I'd like to expand this to remove clients as well and to do so upon a rename of a machine, not just removal.
To add a node to chef, there's an install pre hook python script that I hacked up a little to make work with the latest version of Cobbler and Chef. The first thing I needed to do was setup knife to work with the cobbler user for the trigger to work. This is pretty straight forward, just make sure that the user cobbler runs as has a ~/.chef directory and a working knife.rb configuration file.
The unpleasant thing I found with Cobbler is the triggers now sit within the Cobbler package installation in Python. The trigger files in the Gist need to live in the site-packages/cobbler/modules directory (/usr/lib/python2.6/site-packages/cobbler/modules/ on our CentOS6 cobbler box). With the triggers in place, using management classes in Cobbler we're now able to auto-configure machines upon boot which removes a few steps from our process.
If you have any updates, fixes or suggestions on how to make this integration even better, please let me know!
The trigger files: https://gist.github.com/gmr/5339326
Posterous.com finally is shutting down (or has by now). I had moved from Tumblr to Posterous because I liked the Posterous view of blogging and content and thought if I was able to send posts via email that I'd blog more often. It turns out that how I add blog posts has nothing to do with how often I blog.
Anyway, as a result of Posterous shutting down, I decided it was time to bring my blog back into my control. I went through a minor selection process, deciding that I did not want to use a traditional blogging system but instead I wanted to use one of the newer static blog generation tools. I settled on Nikola, a Python based system for doing so.
The conversion from Posterous was fairly painless. It took a few hours to get my content back in (mostly) and working the way I wanted. First I converted the Posterous backup from WordPress format, importing it into Nikola using Nikola's built in tool for doing so.
Unfortunately this left me with poorly formatted and unmaintainable HTML so I then ran html2rst on the HTML files, converting them to Nikola's native reStructured Text format. Then I had to clean up the conversion post by post. One of my posts on using sphinx autodoc did not convert well because it was documenting using rst in rst and I didn't have time to figure out exactly why it kept blowing up.
Unfortunately, because I was using Posterous and they had their own comment system and now I have Disqus running my comments, I have lost my comment history. I don't know what I am going to do about that. They were in the backup that Posterous provided me.
Have you been wondering what the redis_git_dirty stat is in the Redis info command is?
Me too, so I downloaded the source and started grepping. What I found was that it is a constant that is defined in the src/mkreleasehdr.sh file. The exact command run to gather the value is:
GIT_DIRTY=git diff 2> /dev/null | wc -l
What is it doing? It's counting the number of lines when running the git diff command against the redis source code.
The purpose? It looks like it's a development flag to indicate how different the compiled version of redis is from what the last git commit was.
Should you monitor it? Nope, it's a waste of bits. In theory, this should always be 0 in a production release.
I tend to write a fair amount of command-line applications in Python that more often than not are meant to run as daemons. I also tend to use the same patterns in doing so. At first I wrote daemonization code myself following the general pattern that can be found in many places. Then I discovered python-daemon, the reference implementation of PEP-3143.
For logging, I would often use the same code, cut-and-paste from one application to another. After digging into the logging documentation for Python 2.7, I decided that DictConfig in logging was for me, but I needed support in 2.6. I also wanted something I could install via pip instead of copying the code from 2.7's logging package and including it in my code. Thus logging-config was born. *Edit: I have now removed logging-config and moved to logutils thanks to a comment by Vinay Sajip below.
Today, I am releasing clihelper, a Python module that aims to make writing command-line applications and daemons in Python easier. It uses python-daemon and logging/logutils together with a YAML based configuration file to let one focus on writing the core application and not the details about how to deal with command-line option handling, configuration, logging and daemonization.
Getting started with clihelper is meant to be very straightforward; simply extend the clihelper.Controller class:
class MyApp(clihelper.Controller): def process(self): self._logger.info('Would be processing at the specified interval now')
Next, in the main guard for the python module you are putting the MyApp class clihelper.setup method should be called, then call the clihelper.run method passing in the class that will be used as your application controller.:
if __name__ == '__main__': clihelper.setup('MyApp', 'MyApp is just a demo', '0.0.1') clihelper.run(MyApp)
Next, the configuration file should be created:
Application: wake_interval: 60 Daemon: user: myappuser group: myappgroup pidfile: /var/run/myapp.pid Logging: version: 1 formatters:  verbose: format: '%(levelname) -10s %(asctime)s %(process)-6d %(processName) -15s %(name) -10s %(funcName) -20s: %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' handlers: console: class: logging.StreamHandler formatter: verbose debug_only: True loggers: clihelper: handlers: [console] level: INFO propagate: true myapp: handlers: [console] level: DEBUG propagate: true disable_existing_loggers: true incremental: false
Now invoke your application via the command line. Try passing --help to see the base level options.
clihelper allows you to add your own command line options and does not need to be interval based. Instead, one can use a blocking IOLoop or other similar concepts. In the class that extends clihelper.Controller, redefine the clihelper.Controller.run method. In addtion, you'll likely want to extend the clihelper.Controller.cleanup method to tell the IOLoop to stop when the application has been signalled to stop. An example with the Tornado IO Loop may look something like:
import clihelper from tornado import ioloop class Test(clihelper.Controller): def run(self): # Setup the socket and listen self.ioloop = ioloop.IOLoop.instance() try: self.ioloop.start() except KeyboardInterrupt: LOGGER.info('CTRL-C caught, shutting down') self.cleanup()
There are a few other options, so if you're inclined to try it out, I suggest reading the documentation and the code. I hope that someone else will find this useful. If you have any suggestions or improvements, please do not hesitate to send them my way.
In a search for a non-hacky way to disable direct mappings to iTunes for the play/pause/next/previous buttons in the keyboard in OSX, I stumbled across the "Remote control daemon" (rcd). As it turns out, disabling the iTunes launch behavior for these keys is as easy as unloading rcd with the following command:
launchctl unload -w /System/Library/LaunchAgents/com.apple.rcd.plist
I've not run across any negative impact of doing so and now when I use Enqueue, the buttons work for just it.
On Friday, Doug Hellmann of PyMOTW fame commented on Twitter that he'd be interested in the breakdown of laptop vendors for people attending PyCon, something I had been pondering that morning as well.
In a attempt to come up with an approximation of this number, this morning I kicked off Kismet to listen on the network for approximately 10 minutes while it collected 2,371 MAC addresses. Taking the data it collected, I wrote a script that looked up the MAC addresses it found to look the manufacturer. I was surprised at inconsistency of the company names in the MAC address vendor database. I cleaned up the multiple versions of manufacturer strings and ended up with data I could then easily chart.
The results confirmed the impression one gets just walking around and looking at what people are using:
A pretty strong showing of Apple devices. It appears that the lower end numbers are either mobile devices on the network or network gear.
I've been listening to a lot of the conversations at PyCon about asynchronous development. The basic sentiment I've picked up on is that Callback Passing Style (CPS) is not currently in favor in the Python community. This in addition to the popular use of the BlockingConnection in Pika has lead me to think about how to plan Pika's future enhancements. After some conversation with Tony, I believe I have an outline that should appeal to Python developers while keeping Pika asynchronous at its core and retaining CPS. I think that CPS is very powerful and believe it's still very important to Pika's future.
After I release 0.9.5, I will start development on Pika 2.0 which will be an effort to create a more pythonic approach to using Pika while retaining the ability to use CSP and keeping it asynchronous at its core.
The roadmap for changes to Pika 2.0:
Backwards incompatible change that drops Python 2.4, 2.5 support
Add Python 3 support
Remove existing connection adapter system
Both behaviors available from the same API calling same classes and methods
API notation closer to AMQP spec for implementation.
basic.consume and basic.get will return a single object with a materialized view of the Method, Header and Body frames.
Build in support for specific broker types and pure AMQP 0-9-1.
Here's an example of what I expect Pika 2.0 to look like for non-CPS use. Note this is more of an idea of how it will work for someone using Pika than a spec or actual code.:
from pika.rabbitmq import Connection from pika import Basic from pika import Channel from pika import Exchange from pika import Queue from sys import exit # All the attributes can be passed in via constructor or assigned connection = Connection() connection.host = 'localhost' connection.port = 5762 connection.user = 'guest' connection.pass = 'guest' connection.vhost = '/' # Not much new here try: connection.open() except pika.ConnectException as e: print "Could not connect: %s" % e sys.exit(0) # Channel construction outside of connection context, instead pass # the Connection in channel = Channel() try: channel.open(connection) except pika.TimeoutException as e: print "Could not open a channel: %s" % e except pika.ConnectionClosedException as e: print "Could not open a channel, the connection is closed" # All the attributes can be passed in via constructor or assigned exchange = Exchange(channel) exchange.name = 'not_microsoft' exchange.type = 'fanout' exchange.durable = True exchange.declare() # All the attributes can be passed in via constructor or assigned queue = Queue(channel) queue.name = 'my_queue' queue.auto_delete = False queue.durable = True queue.passive = False # Declare the queue and expect a bool if not queue.declare(): raise Exception("Could not declare my queue") # Print info about the queue that was mapped automatically when # Queue.DeclareOk was received print 'Queue "%s"' % queue.name print ' Depth : ' % queue.message_count print ' Consumers : %i' % queue.consumer_count # Bind the queue queue.bind(exchange=exchange, routing_key='not_microsoft.my_queue') # Generator returning one type for a message for message in Basic.consume(my_channel, routing_key="myqueue"): print 'Delivery Tag : %s' % message.delivery_tag print 'Channel : %i' % message.channel print 'Body Size : %i' % len(message.body) print 'Properties' print ' Content-Type : %s' % message.properties.content_type print ' Timestamp : %s' % message.properties.timestamp print ' User Id : %s' % message.properties.user_id print ' App Id : %s' % message.properties.app_id print 'Body : %s' % message.body
I am looking for feedback on this direction. Do these changes and the example make sense to existing Pika and RabbitMQ uses Would you change anything about this direction What would you improve?
I've posted in the past about RabbitMQ and how to get started with it. ?I thought I'd follow up with something that's a little deeper, but still pretty entry level from a usage perspective. I've been spending a lot of time with RabbitMQ lately, as I'm now the active maintainer of Pika, the python package for interfacing with it. Part of my diving deep into Pika has been becoming better versed in the AMQP standard.
AMQP or the Advanced Message Queueing Protocol is a protocol standard developed by a consortium of organizations and developers including Rabbit Technologies (now VMWare/Spring), RedHat, JPMorgan Chase, Cisco and others. While RabbitMQ supports multiple protocols including STOMP and HTTP, AMQP is its primary protocol and strongly defines RabbitMQ's functional behavior, even when using other protocols. It is because of the strong tie of RabbitMQ to AMQP that has led me to the conclusion that to fully understand RabbitMQ you first need to fully understand AMQP.
To that end, RabbitMQ has provided an excellent yet terse AMQP and API Quick Reference. ?For the purposes of the information below I will be referencing AMQP version 0-9-1 and RabbitMQ 2.3.1 only.
Going down the Rabbit Hole
In this post I wanted to focus on how consumers can work with RabbitMQ to ensure that when messages are not processed correctly in client applications, they do not get dropped on the floor. To get there, let me first explain the different connotations of the term "message" when talking about RabbitMQ and AMQP.
The AMQP protocol specifies the semantics for sending different types of protocol level messages by categorizing the type of message by class and method in Class.Method format. Examples of this include Queue.Declare and Basic.Publish. If you're a Pika user you may know these as Channel.queue_declare() and Channel.basic_publish(). When referring to a message in context to something in Class.Method format, I am referring to the protocol message.
The class Basic encompasses most of the functionality related to sending and receiving application level messages. When we use Basic.Publish we are constructing a protocol level message with the application level message as a payload. It is the application level message that is received by the consumer application as a combination of a header and body.
Instead of focusing on how different AMQP and RabbitMQ clients are used from a programmers perspective, I will attempt to primarily reference AMQP protocol level constructs with few references to client implementations.
To illustrate the lifecycle of sending and receiving an application level message, the following steps are taken with a client application using the Basic.Consume method of receiving messages:
Example of publishing a message with Pika:
::properties = BasicProperties(timestamp=time.time(), content_type="text/plain", delivery_mode=1)channel.basic_publish(exchange='example', routing_key="example.test", body=message, properties=properties)
?The server receives the?*Basic.Publish* message and routesit into the appropriate queue as specified by the exchange and routing key.
?Assuming that our client application is operating properly and RabbitMQ is ready to deliver it a message, when the message published in Step #1 is ready to be delivered**,?RabbitMQ constructs a Basic.Deliver message with the payload of the Basic.Publish message it received in step #2.
?The consuming application's RabbitMQ client library will receive the message, decode it and surface three distinct parts of the protocol message to the consuming application:
1. ?The Method frame?defines the core information about the protocol message. This includes the channel number the message was delivered on and the delivery-tag for the message. ?The delivery-tag in conjunction with the current connection and channel number uniquely identifies the message when communicating with RabbitMQ. 2. ?The Header frame defines the properties of the application message. In the case of our example from step #1, this includes the timestamp and content_type as well as properties about the body frame such as the content length. 3. ?The Body frame is the raw application level message data as assigned to the body parameter of channel.basic_publish() in step #1.
?Assuming that, when the client application set itself up as a consumer, it did not specify the "noack" option the next step is for the client application to send a Basic.Ack?message with the delivery-tag received in the header frame in step #4. This lets RabbitMQ know the client application has received the message and RabbitMQ may now delete the message from its stack.
Example of sending a message acknowledgment with Pika:
?* This workflow is based upon the Basic.Consume method of consuming messages, the flow is slightly different for use of Basic.Get
** I am not deeply familiar with internal RabbitMQ application architecture and flow. Steps that illustrate what RabbitMQ is doing internally may accurately represent the internal RabbitMQ flow and are only meant to illustrate what is happening at a logical level of interfacing with RabbitMQ.
Dealing with Rejection
With step #5 we have concluded the lifespan of a properly consumed message and this covers a majority of the consuming behavior except when there is an exception in processing. In the event that a client application is unable to process a message and is able to let RabbitMQ know of the error, it should respond with either a Basic.Reject or Basic.Nack?message.
Basic.Reject is specified in AMQP 0-9-1 with conflicting rules. Due to this conflict, RabbitMQ implements Basic.Reject with a behavior that does not match the exact behavior from the specification. The specification states that the client should not use Basic.Reject as a way to keep the broker from redelivering the message to the rejecting client and then in contradiction specifies that the broker should not redeliver the same message to?a consumer who rejects it.
If the requeue parameter is True when calling Basic.Reject RabbitMQ will treat the rejected message as a new message in the queue and may deliver it again to the consumer who rejected it. If the requeue parameter is False then RabbitMQ will delete the message from its stack. For more information on the Basic.Reject implementation in RabbitMQ, check out the RabbitMQ blog post on the subject.
Example rejecting a message with Pika:
Basic.Nack is a RabbitMQ only extension of the AMQP protocol and expands upon the Basic.Reject behavior in a few ways. The important distinction in this context is that?*Basic.Nack* allows for multiple delivery-tags to be passed back upon receipt of the message. As a side note, Basic.Nack is also implemented in RabbitMQ as a mechanism for the server to reject a Basic.Publish?message sent via from a publishing client. An example use case for server to client Basic.Nack is when the user_id property of a message sent with Basic.Publish does not match the authenticated user for the connection.
Example of sending Basic.Nack with Pika:
Since RabbitMQ currently does not have a built-in dead-letter system for when requeue=False when using either Basic.Reject or Basic.Nack, I have employed using a dead-letter queue for rejected messages. When using this approach, I send a?*Basic.Reject* message?with requeue=False and then a Basic.Publish with the message to the dead-letter queue.
Resurfacing messages in limbo
Rejecting messages is a great way to deal with messages you knowingly can not process in your client application, but what happens when a message makes your consumer unintentionally die? In any scenario where you can not issue a Basic.Ack, Basic.Nack or Basic.Reject to the RabbitMQ server and messages become stuck without redelivering, you can tell RabbitMQ to redeliver them.
Once your application is consuming, you send a Basic.Recover?message to RabbitMQ and it will start delivering you messages that have yet to be acknowledged. If you specify the requeue flag to True, it will deliver to any consumer who is eligible for the message and if you specify it to False it will only try to redeliver it to the consumer who it originally sent it to, if it can.
Example of this when using Pika:
Taking this ability to the next level, your application may want to issue a passive Queue.Declare which will return the count of pending messages in the queue to your application. If you notice that you're not able to consume some messages and the pending message count is consistent, even when you're sitting mostly idle, you could issue a Basic.Recover?message to try and unstick them.
I hope this was useful in understanding not only how to deal with failures in message processing, but also has given you a glimpse into the underlying AMQP protocol. Want to dive deeper? Check out RabbitMQ's 0-9-1 reference, which is clearer than the protocol spec. Did you not follow what I meant about a passive Queue.Declare? It's okay, I'd like to explain more. Let me know what parts of RabbitMQ and AMQP I should explain in more detail in a future blog post.
In working on the next version of Pika I've been implementing functional and unit testing of the various bits code base. I ran into a weird bug when I was working on a test case for connecting and disconnecting from RabbitMQ. The way that I structured the test is to iterate through each of Pika's adapters doing a connect and disconnect test. What I encountered was that any time a test followed the AsyncoreConnection adapter, nosetests would hang. If I changed the order of the tests and put asyncore at the end, the tests would complete successfully.
After much frustration I decided to dig into the asyncore code to see if I could figure out what was going on. The first thing I noticed other than the coding style is that the code hasn't been updated in over 10 years, so I wasn't the first to stumble upon this bug. As such I started digging in the bug tracker on python.org and ran across http://bugs.python.org/issue10878 which is very recent (2011-01-10).
Teodor Georgiev had been seeing the same symptom as I was encountering and went as far as to isolate the reason. Since he pointed out the problem is when the read, write and both flags are empty lists, I decided to go look at the code in context.
asyncore.loop() has four parameters: timeout, use_poll, map, and count. The problem stood out when I saw that if you do not pass your own socket map into asyncore.loop, it will use the asyncore.socket_map dictionary created when you import asyncore. The odd thing to me about the default socket_map dictionary is that it doesn't seem to be maintained in the module.
The hack to get around the bug ended up being passing in an empty list into the map parameter when calling asyncore.loop() as it only checks to make sure you didn't pass in a value. When you do this, map has a value other than None and the while loop on line 209 and 213 can exit because an empty list will evaluate to False. A empty dictionary, like socket_map will just sit in that loop and run forever.
The proper bug fix should be to change 209 and 213 to have it check len(map) not just map.
One of the points I've been working on refining in my talks on Tornado is that it's a take what you need framework. At myYearbook we took this to the next step and have been experimenting with building non HTTP based protocol servers on the tornado.ioloop module. This has some great advantages for rapid prototyping of servers while making available things like the async tornado.httpclient.
In essence, this is what I do for the Pika Tornado Adapter as well.
I'll be giving a talk at pyCon 2011 in March on Tornado, I'll have to figure out a way to bring this point home in one slide or less.