RabbitMQ – Pika

Short Tutorial on Rabbit MQM & PIKA

To tell you short Rabbit MQM is somewhat a Mail Server. If you still dont understand it, then think of it like a friend who safe keep the courier sent you when you are not available in office.

Technicall speaking, Rabbit is (1) an Open Source (2) Message Queue Broker.

Tutorial

MQM Definitions:

  • Producer: one who creates a job request.
  • Consumer: one who processer a job request.
  • Queue: one who stores request.
  • Exchange: Gate/Door for Queue.
  • RoutingKey: Key for accessing Queue via Exchange.

Generally, this is how people/process send work

producer -> consumer

In Reality, as consumers are not always standing for your instruction, so,.. having a queue system helps

producer -> stack/queue -> consumer

For some reasons, mostly a producer generally does not stick to one type of customer and starts creating multiple kind of products for different purposes and consumer

producer -> exchange_server -> Queue -> consumer

An Exchange Server, understand the producer and sends the requests to relavant queues.

Queue

I’m using macbook(good/bad), so here are my installation instructions

# Installing RabbitMQ
brew install rabbitmq

Rabbit uses erlang, so brew will install it. No need to specify special instruction.

Consumer

Now that we have a message broker/message queue manager, we need some body who will actually pick and do that tasks.

# pip install celery

Producer

We will celery module itself to create and send work.

Plan

  1. Created a simple example of a slow running job.
  2. create a queue
  3. created an exchange

1. Sample Working Python Program

import random
import time

def fibo(t):
    """Return the smallest fibonaci number less than given value."""
    a = 0
    b = 1
    while (b < t):
        a, b = b, a + b
    return a

def main(value):
    time.sleep(random.random() * 3)
    return fibo(value)

Testing fibo

&gt;&gt;&gt; ## testing
&gt;&gt;&gt; fibo(12), fibo(21), fibo(1112)
(8, 13, 987)

Testing Producer

## testing
main(12), main(21), main(1112)
(8, 13, 987)

2. Working RabbitMq

Experimental Commands

###################################################################
# rabbit mqm is a Message queue broker
#    It has Queues, Exchanges for Queues and
#           Routing Key that connect them.
#
#  Here are some commands to reset your MQM if wish to experiment
###################################################################

# starting mqm server
#     webpage     (http://localhost:15672/#/)
#     credentials (guest/guest)
rabbitmq-server

# to stop - stopping above does not always kill the background
#   so use this command.
kill `ps -eaf | grep -v grep | grep rabbit | awk '{ print $2 }'`

###################################################################
# rabbit mqm - resetting queues
###################################################################

# stop application
rabbitmqctl stop_app

# reset
rabbitmqctl reset

# start
rabbitmqctl start_app

# kill rabbit 
kill `ps -eaf | grep -v grep | grep rabbit | awk '{ print $2 }'`

###################################################################

If you have installed correct, you can run following command in your terminal

rabbitmq-server

The default rabbitmq server comes with a UI, and you can find it at following url.

Default credentials are username: guest and password: guest

1. created queues, exchanges and routing_keys

In exchange page, you can see the newly created test-exchange in the list of exchanges, click it.

Go to section, Add binding from this exchange and start adding following

  • To queue, test-queue1 and routing key is redpill
  • To queue, test-queue2 and routing key is bluepill

Let’s learn to send a request to a queue

Following shell code“

SDS-bash3.2$ cat create_request.py
from tasks import wrapped_main

if __name__ == "__main__":
    print(wrapped_main.delay(15))

Executing above code

SDS-bash3.2$ python create_request.py
94c77870-2262-4ccd-af22-b45326c3c920

In Rabbit MQM, we can see a message came to Celery from Queues Page.

Issue is, we need this message to our test-queue1.

# taking a backup copy
SDS-bash3.2$ cp -p tasks.py  tasks_old.py

# update the file
SDS-bash3.2$ vi tasks_old.py

# check the differences
SDS-bash3.2$ diff tasks.py  tasks_old.py
8c8
< @app.task(queue='test-queue1')
---
> @app.task

In Rabbit MQM, we can see a message came to test-queue1 from Queues Page.

Next Issue, we need to see this output message.

SDS-bash3.2$ cat tasks.py
# tasks.py

from celery import Celery
from kombu import Exchange, Queue
from main import main

pika_cred = pika.PlainCredentials('guest', 'guest')
pika_conn_params = pika.ConnectionParameters('localhost', credentials=pika_cred)
pika_connection = pika.BlockingConnection(pika_conn_params)
channel = pika_connection.channel()


######################################################################

# creating architecture - queues
channel.queue_declare(queue='test-queue1')
channel.queue_declare(queue='test-queue2')

# creating architecture - queues
channel.exchange_declare(exchange='test-exchange', exchange_type='direct')

# creating architecture - queues
channel.queue_bind(exchange='test-exchange', queue='test-queue1', routing_key='redpill')
channel.queue_bind(exchange='test-exchange', queue='test-queue2', routing_key='bluepill')

######################################################################

app = Celery("tasks", broker="pyamqp://guest@localhost//")

@app.task(queue='test-queue1')
def wrapped_main(value):
    return main(value)

Adding a backend process now.

<br />def pika_messager(message):
    """A messenger to send message to Queue."""
    header = "Host: " + str(os.uname()[1]) + "\tDatetime: " + str(time.ctime()) + '\n\n'
    print('start Pika Connection ' + header)
    pika_connection = pika.BlockingConnection(pika_conn_params)
    channel = pika_connection.channel()
    channel.basic_publish(exchange='variantCallerExchange', routing_key='variantCallerRoutingKey', body=header+message)
    channel.close()
    pika_connection.close()
    print('Pika Connection is close now')


@app.task(queue='test-queue1')
def wrapped_main(value):
    result = main(value)
    pika_messager("Job finished ! Result is" + result)
    return result

Complete Code & Summary

We have a main.py which our main script(product) which can do a certain job. We are usinga RabbitMq, a message queue manager(broker) to manage requests. We will use pika for creating requests, creating message(queues, exchanges, routing keys) and we will use celery for reporting completed jobs.

Note: Complete jobs means both success or failure.

A

filename: main.py

#! main.py
import random
import time

def fibo(t):
    """Return the smallest fibonaci number less than given value."""
    a = 0
    b = 1
    while (b < t):
        a, b = b, a + b
    return a

def main(value):
    time.sleep(random.random() * 10)
    return fibo(value)

filename: task.py

from celery import Celery
from kombu import Exchange, Queue
from main import main

pika_cred = pika.PlainCredentials('guest', 'guest')
pika_conn_params = pika.ConnectionParameters('localhost', credentials=pika_cred)
pika_connection = pika.BlockingConnection(pika_conn_params)
channel = pika_connection.channel()


######################################################################

# creating architecture - queues
channel.queue_declare(queue='test-queue1')
channel.queue_declare(queue='test-queue2')

# creating architecture - queues
channel.exchange_declare(exchange='test-exchange', exchange_type='direct')

# creating architecture - queues
channel.queue_bind(exchange='test-exchange', queue='test-queue1', routing_key='redpill')
channel.queue_bind(exchange='test-exchange', queue='test-queue2', routing_key='bluepill')

######################################################################

app = Celery("tasks", broker="pyamqp://guest@localhost//")

def pika_messager(message):
    """A messenger to send message to Queue."""
    header = "Host: " + str(os.uname()[1]) + "\tDatetime: " + str(time.ctime()) + '\n\n'
    print('start Pika Connection ' + header)
    pika_connection = pika.BlockingConnection(pika_conn_params)
    channel = pika_connection.channel()
    channel.basic_publish(exchange='variantCallerExchange', routing_key='variantCallerRoutingKey', body=header+message)
    channel.close()
    pika_connection.close()
    print('Pika Connection is close now')


@app.task(queue='test-queue1')
def wrapped_main(value):
    result = main(value)
    pika_messager("Job finished ! Result is" + result)
    return result

Testing

import os
import time
import pika

# Pika - result messages
pika_cred = pika.PlainCredentials('guest', 'guest')
pika_conn_params = pika.ConnectionParameters('localhost', credentials=pika_cred)

pika_connection = pika.BlockingConnection(pika_conn_params)
channel = pika_connection.channel()
pika_connection = pika.BlockingConnection(pika_conn_params)
channel.queue_declare(queue='test-queue1')

######################################################################
# sending messages
header = "Host: " + str(os.uname()[1]) + "\tDatetime: " + str(time.ctime()) + '\n\n'
channel.basic_publish(exchange='test-exchange', routing_key='bluepill', body=header + message)
######################################################################
Advertisements