
Welcome to mooq¶
Latest Version: v 0.1.1
mooq is an asyncio compatible library for interacting with a RabbitMQ AMQP broker.
Features¶
- Uses asyncio. No more callback hell.
- Simplified and pythonic API to RabbitMQ
- Built on top of the proven pika library
- Comes with an in memory broker for unit testing projects that depend on RabbitMQ
Get It Now¶
$ pip install mooq
Just mooq it¶
Creating a connection:
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
Creating a channel of the connection:
chan = await conn.create_channel()
Registering a producer:
await chan.register_producer(
exchange_name="log",
exchange_type="direct")
Registering a consumer and associated callback:
async def yell_it(resp):
print(resp['msg'].upper())
await chan.register_consumer(
exchange_name="log",
exchange_type="direct",
routing_keys=["greetings","goodbyes"],
callback = yell_it)
Publishing a message:
await chan.publish(exchange_name="log",
msg="Hello World!",
routing_key="greetings")
Process messages asynchronously, running associated callbacks:
loop = asyncio.get_event_loop()
loop.create_task(conn.process_events())
User Guide¶
Tutorial¶
mooq is really useful for creating asyncio based microservices that talk to eachother. So let’s create an app that does just that.
Introducing “in2com”¶
With a real intercom, a person presses a button and says something. On the other end, connected by a long wire, is a speaker that receives the audio and amplifies it.
Our very own in2com app consists of two microservices:
- hello.py for publishing greetings at random intervals
- loud.py for receiving the greetings and logging them in uppercase
Before starting, make sure you have installed rabbitMQ and mooq on your machine. See Installation
hello.py¶
We are going to schedule three coroutines for our hello.py microservice:
- publish_randomly(): for sending “hello world!” to a RabbitMQ broker at random intervals of between 1 and 10 seconds.
- tick_every_second(): for regularly printing a “tick” to the console, similar to an intercom having a blinking green LED to let us know it is on
- main(): the entry point for running the microservice. It sets up the connection to the RabbitMQ broker and schedules the tick_every_second() and publish_randomly() coroutines.
The main() coroutine looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import mooq
import asyncio
import random
async def main():
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
chan = await conn.create_channel()
await chan.register_producer(
exchange_name="in2com_log",
exchange_type="direct")
loop.create_task(tick_every_second())
loop.create_task(publish_randomly(chan))
|
Before we can publish messages to the broker, we first need to connect to it using the mooq.connect()
function. mooq will raise an exception if it cannot connect to the broker.
Once we have a connection, we can create a channel using the create_channel()
method of the conn object.
Channels enable many different producers and consumers to multiplex one connection to RabbitMQ. This is helpful because establishing a connection is generally an expensive operation. When using mooq, you should only have one producer or consumer per channel.
Once we have a channel, we can register a producer with the broker using the register_producer()
method of the chan object. This tells the broker to register a direct exchange called “in2com_log” if isn’t already registered. Publishing to a “direct” exchange ensures a message goes to the queues whose routing key exactly matches the routing key of the message. Exchanges in mooq can be either “direct”, “topic” or “fanout”.
The last two lines of main() schedule the other coroutines to run.
The publish_randomly() coroutine looks like this:
1 2 3 4 5 6 7 8 9 | async def publish_randomly(chan):
while True:
await chan.publish(
exchange_name="in2com_log",
msg="Hello World!",
routing_key="greetings")
print("published!")
await asyncio.sleep(random.randint(1,10))
|
In mooq messages are published at the channel level and messages are consumed at the connection level. We’ve found this fits in best with asyncio apps. Invoking chan.publish()
sends a “Hello World!” message with a routing key of “greetings” to the “in2com_log” exchange. Messages must be json serialisable.
If we tried to publish to an exchange that isn’t registered with the broker, an exception would’ve been raised.
The tick_every_second() coroutine is self explanatory:
1 2 3 4 5 6 | async def tick_every_second():
cnt = 0
while True:
print("tick hello {}".format(cnt))
cnt = cnt + 1
await asyncio.sleep(1)
|
Finally, to run the microservice from the command line, we add statements to get the event loop, schedule the main coroutine and then run the event loop:
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
Final hello.py source:
import mooq
import asyncio
import random
async def main():
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
chan = await conn.create_channel()
await chan.register_producer(
exchange_name="in2com_log",
exchange_type="direct")
loop.create_task(tick_every_second())
loop.create_task(publish_randomly(chan))
async def tick_every_second():
cnt = 0
while True:
print("tick hello {}".format(cnt))
cnt = cnt + 1
await asyncio.sleep(1)
async def publish_randomly(chan):
while True:
await chan.publish(
exchange_name="in2com_log",
msg="Hello World!",
routing_key="greetings")
print("published!")
await asyncio.sleep(random.randint(1,10))
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
loud.py¶
We are going to schedule three coroutines for our loud.py microservice:
- main(): the entry point for running the microservice. It sets up the connection to the RabbitMQ broker and schedules coroutines.
- process_events(): for scheduling coroutines to run on receiving messages
- tick_every_second(): for regularly printing a “tick” to the console, similar to an intercom having a blinking green LED to let us know it is on
The main() coroutine looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import mooq
import asyncio
#the callback to run
async def yell_it(resp):
print(resp['msg'].upper())
async def main(loop):
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
chan = await conn.create_channel()
await chan.register_consumer(
exchange_name="in2com_log",
exchange_type="direct",
routing_keys=["greetings","goodbyes"],
callback = yell_it)
loop.create_task(tick_every_second())
loop.create_task(conn.process_events())
|
As per hello.py, we connect to the broker and create a channel to use. Next we register a consumer on the channel. As per register_producer()
, register_consumer()
tells the broker to register a direct exchange called “in2com_log” if isn’t already registered.
The routing_keys argument is a list of routing keys that we want to match against. If a message is published to the “in2com_log” exchange with either the “greetings” or “goodbyes” routing keys, then the broker will send the message to our channel. If a message were to be published with any other routing key, the channel not receive the message.
We instruct mooq to run the callback yell_it()
when a message is received. In mooq, callbacks are always coroutines with one argument - a response dictionary. This enables apps to be purely based in the asyncio world. The response dictionary for each callback contains the message sent as well as metadata such as the routing key it was sent with.
As per hello.py, we schedule the tick_every_second() coroutine to run.
Lastly, we schedule a task to run conn.process_events()
that listens for all messages being sent to all channels of the connection and runs the required callbacks. It bears repeating that in mooq, messages are published at the channel level and messages are consumed at the connection level.
conn.process_events()
should always run as a seperate task and not awaited for, as it is designed to run until explicitly stopped.
Finally, as per hello.py, to run the microservice from the command line, we add statements to get the event loop, schedule the main coroutine and then run the event loop:
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
Final loud.py source:
import mooq
import asyncio
#the callback to run
async def yell_it(resp):
print(resp['msg'].upper())
async def main(loop):
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
chan = await conn.create_channel()
await chan.register_consumer(
exchange_name="in2com_log",
exchange_type="direct",
routing_keys=["greetings","goodbyes"],
callback = yell_it)
loop.create_task(tick_every_second())
loop.create_task(conn.process_events())
async def tick_every_second():
cnt = 0
while True:
print("tick loud {}".format(cnt))
cnt = cnt + 1
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
Running¶
Open up two tabs in your favourite terminal program.
Terminal 1:
$ python hello.py
tick hello 0
published!
tick hello 1
tick hello 2
published!
tick hello 3
tick hello 4
tick hello 5
published!
tick hello 6
Terminal 2:
$ python loud.py
tick loud 0
HELLO WORLD!
tick loud 1
tick loud 2
HELLO WORLD!
tick loud 3
tick loud 4
tick loud 5
HELLO WORLD!
tick loud 6
Examples¶
Direct¶
hello.py:
- Prints a ‘tick’ message every second and publishes messages to a RabbitMQ at the same time.
import mooq
import asyncio
import random
async def main():
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
chan = await conn.create_channel()
await chan.register_producer(
exchange_name="in2com_log",
exchange_type="direct")
loop.create_task(tick_every_second())
loop.create_task(publish_randomly(chan))
async def tick_every_second():
cnt = 0
while True:
print("tick hello {}".format(cnt))
cnt = cnt + 1
await asyncio.sleep(1)
async def publish_randomly(chan):
while True:
await chan.publish(
exchange_name="in2com_log",
msg="Hello World!",
routing_key="greetings")
print("published!")
await asyncio.sleep(random.randint(1,10))
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
loud.py:
- Prints a ‘tick’ message every second and processes messages from RabbitMQ at the same time.
import mooq
import asyncio
#the callback to run
async def yell_it(resp):
print(resp['msg'].upper())
async def main(loop):
conn = await mooq.connect(
host="localhost",
port=5672,
broker="rabbit")
chan = await conn.create_channel()
await chan.register_consumer(
exchange_name="in2com_log",
exchange_type="direct",
routing_keys=["greetings","goodbyes"],
callback = yell_it)
loop.create_task(tick_every_second())
loop.create_task(conn.process_events())
async def tick_every_second():
cnt = 0
while True:
print("tick loud {}".format(cnt))
cnt = cnt + 1
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
Terminal 1:
$ python hello.py
tick hello 0
published!
tick hello 1
tick hello 2
published!
tick hello 3
tick hello 4
tick hello 5
published!
tick hello 6
Terminal 2:
$ python loud.py
tick loud 0
HELLO WORLD!
tick loud 1
tick loud 2
HELLO WORLD!
tick loud 3
tick loud 4
tick loud 5
HELLO WORLD!
tick loud 6
API¶
Connect to a Broker¶
-
coroutine
mooq.
connect
(host='localhost', port=5672, broker='rabbit')¶ Create a connection object and then connect to a broker
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
- broker (str) – broker type. Currently supported broker types are “rabbit” for a RabbitMQ broker and “in_memory” for an broker that resides in memory (useful for unit testing)
Returns: InMemoryConnection
orRabbitMQConnection
objectTodo
raises BrokerConnectionError if cannot connect to the broker
RabbitMQ Transport¶
-
class
mooq.
RabbitMQConnection
(**kwargs)¶ Implementation of a connection to a RabbitMQ broker.
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
Note
must call
RabbitMQConnection.connect()
to actually connect to the broker-
close
()¶ Stop processing events and close the connection to the broker
Raises: NotImplementedError –
-
coroutine
connect
()¶ Connect to the RabbitMQ broker
-
coroutine
create_channel
()¶ create a channel for multiplexing the connection
Returns: a RabbitMQChannel
object
-
get_broker
(*, host, port)¶ Get the
Broker
object associated with the connection.Parameters: - host (str) – the hostname of the broker
- port (int) – the port of the broker
Returns: A
Broker
object
-
coroutine
process_events
(num_cycles=None)¶ Receive messages from the RabbitMQ broker and schedule associated callback couroutines.
Should be run as a task in your app and not awaited for.
Parameters: num_cycles (int|None) – the number of times to run the event processing loop. A value of None will cause events to be processed without a cycle limit.
-
class
mooq.
RabbitMQChannel
(*, internal_chan, loop)¶ Implementation of a RabbitMQ Channel
Parameters: - internal_chan – the transport specific channel object to use
- loop – the event loop
Typically this class will be instantiated outside the main thread.
-
coroutine
publish
(*, exchange_name, msg, routing_key='')¶ Publish a message on the channel.
Parameters: - exchange_name (str) – The name of the exchange to send the message to
- msg (str) – The message to send
- routing_key (str) – The routing key to associated the message with
-
coroutine
register_consumer
(queue_name=None, routing_keys=[''], *, exchange_name, exchange_type, callback)¶ Register a consumer on the RabbitMQ channel.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
- queue_name (str|None) – name of the queue. If None, a name will be given automatically and the queue will be declared exclusive to the channel, meaning it will be deleted once the channel is closed.
- callback (coroutine) – The callback to run when a message is placed on the queue that matches one of the routing keys
- routing_keys ([str,]) – A list of keys to match against. A message will only be sent to a consumer if its routing key matches one or more of the routing keys listed
-
coroutine
register_producer
(*, exchange_name, exchange_type)¶ Register a producer on the channel by providing information to the broker about the exchange the channel is going to use.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
Returns: None
-
class
mooq.
RabbitMQBroker
(*, host, port)¶ Control an existing RabbitMQBroker on your machine.
Useful when performing integration testing of projects that depend on RabbitMQ.
Add the broker to the registry. Each broker is given a unique name of “host_port” in the registry.
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
-
coroutine
close
()¶ close the broker
Raises: NotImplementedError
-
coroutine
run
(is_running=None)¶ Restarts the RabbitMQ broker using a method derived from the TEST_DISTRIBUTION environmental variable.
Parameters: is_running (future) – A future set to done once the broker is confirmed as being running If TEST_DISTRIBUTION==”arch”, will try to restart rabbitmq using the linux
systemctl
command.If TEST_DISTRIBUTION==”ubuntu”, will try to restart rabbitmq using the linux
service
command.Will wait for 20 seconds after restarting before returning.
Raises: ValueError – if TEST_DISTRIBUTION environmental variable not found Note
the user who invokes this method will likely require sudo access to the linux commands. This can be provided by editing the sudoers file.
In Memory Transport¶
-
class
mooq.
InMemoryConnection
(**kwargs)¶ Implementation of an in memory connection to a broker
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
Note
must call
InMemoryConnection.connect()
to actually connect to the broker-
coroutine
close
()¶ Stop processing events and close the connection to the broker
-
coroutine
connect
()¶ Connect to the InMemory broker
-
coroutine
create_channel
()¶ create a channel for multiplexing the connection :returns: an
InMemoryChannel
object
-
get_broker
(*, host, port)¶ Get the
Broker
object associated with the connection.Parameters: - host (str) – the hostname of the broker
- port (int) – the port of the broker
Returns: A
Broker
object
-
coroutine
process_events
(num_cycles=None)¶ Receive messages from the broker and schedule associated callback couroutines.
Parameters: num_cycles (int|None) – the number of times to run the event processing loop. A value of None will cause events to be processed without a cycle limit.
-
class
mooq.
InMemoryChannel
(**kwargs)¶ Implementation of an in memory broker channel
Parameters: - internal_chan – the transport specific channel object to use
- loop – the event loop
Typically this class will be instantiated outside the main thread.
-
coroutine
publish
(*, exchange_name, msg, routing_key='')¶ Publish a message on the channel.
Parameters: - exchange_name (str) – The name of the exchange to send the message to
- msg (str) – The message to send
- routing_key (str) – The routing key to associated the message with
-
coroutine
register_consumer
(queue_name=None, routing_keys=[''], *, exchange_name, exchange_type, callback)¶ Register a consumer on the channel.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
- queue_name (str|None) – name of the queue. If None, a name will be given automatically and the queue will be declared exclusive to the channel, meaning it will be deleted once the channel is closed.
- callback (coroutine) – The callback to run when a message is placed on the queue that matches one of the routing keys
- routing_keys ([str,]) – A list of keys to match against. A message will only be sent to a consumer if its routing key matches one or more of the routing keys listed
-
coroutine
register_producer
(*, exchange_name, exchange_type)¶ Register a producer on the channel by providing information to the broker about the exchange the channel is going to use.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
Changelog¶
0.1.0 (2017-07-18)¶
- Initial release.
0.1.1 (2017-07-19)¶
- Fixed bug that prevented the queue_name argument of
Channel.register.consumer()
from not being truly optional.
License¶
MIT License
Copyright (c) 2017 Jeremy arr
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Kudos¶
- The python tutorial on the RabbitMQ website is top notch was a great way to get started with AMQP.
- pika, a well thought out and full featured RabbitMQ python library
- Blog posts here and here on some simple ways to unit test asyncio code
Some other awesome projects doing similar things: