This web site has been retired. Please follow my activities at pztrick.com.

pZtrick.com

the personal site of patrick paul

My First Fabric + Celery Cluster Project

| Comments

A couple weekends back, I spent some time kicking the tires of my new OpenStack cluster with a trivialized distributed processing application to calculate digits of pi utilizing multiple worker machines.

[tl/dr: Screenshots are at the bottom.]

pzcluster.git

I wrote a small command-line script main.py to provision machine instances using the python-novaclient API for OpenStack and then configure these machines automatically using Fabric.

The machines automagically install dependency Ubuntu packages defined in fabfile.APT_PACKAGES and dependency Python libraries defined in fabfile.PIP_PACKAGES. Next, the machines git clone my Celery project defined in settings.GIT_URL and complete the tasks therein.

pzcelery.git

pzcelery is my Celery project which is deployed to the machine instances.

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. The execution units, called tasks, are executed concurrently on a single or more worker servers

This library is what lets a programmer consume a cluster effortlessly. One machine is designated as the broker and hosts a message queue; the other machines are designated as workers and these process tasks. Clusters are useful to compute problems which are too large for any one computer to do so (quickly).

The project itself comprises a tasks.py which defines the tasks and a client.py which dispatches tasks to workers. Here, I make some pi:

client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from tasks import make_pi
import time
import datetime

number_of_digits = 20


print "Client requesting pi digits calculated for significance of 1 to %i digits..." % number_of_digits

results = list(make_pi.delay(x) for x in range(1, number_of_digits))

while not all(result.ready() for result in results):
    time.sleep(2)
    finishers = filter(lambda x: x.ready(), results)
    for finisher in finishers:
        response = finisher.get()
        print "[%s] [Worker @ %s] pi: %s" % (datetime.datetime.now().strftime('%I:%M:%S%p'),
                                               response['hostname'],
                                               response['result'])
        results.remove(finisher)
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from celery import Celery, current_task
import os

# ...

@celery.task
def make_pi(number_of_digits):
  """ Calculates pi to the requested number of digits """
    q, r, t, k, m, x = 1, 0, 1, 1, 3, 3
    ret = ''
    length = 0
    while length < number_of_digits:
        if 4 * q + r - t < m * t:
            ret += str(m)
            q, r, t, k, m, x = 10*q, 10*(r-m*t), t, k, (10*(3*q+r))//t - 10*m, x
        else:
            q, r, t, k, m, x = q*k, (2*q+r)*x, t*x, k+1, (q*(7*k+2)+r*x)//(t*x), x+2
        length = len(ret)
    return dict(hostname=current_task.request.hostname, result=ret[:1] + '.' + ret[1:])

Obviously, make_pi is a trivial demonstration task. However, as the GIT_URL is user-defined in settings.py, I will be able to re-use pzcluster.git to run other compute projects in the future, beyond pzcelery.git. This Celery project was merely a proof-of-concept before bigger and better things to come.

Repositories

pzcluster & pzcelery Album

Comments