API¶
Connect to a Broker¶
-
coroutine
mooq.
connect
(host='localhost', port=5672, broker='rabbit')¶ Create a connection object and then connect to a broker
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
- broker (str) – broker type. Currently supported broker types are “rabbit” for a RabbitMQ broker and “in_memory” for an broker that resides in memory (useful for unit testing)
Returns: InMemoryConnection
orRabbitMQConnection
objectTodo
raises BrokerConnectionError if cannot connect to the broker
RabbitMQ Transport¶
-
class
mooq.
RabbitMQConnection
(**kwargs)¶ Implementation of a connection to a RabbitMQ broker.
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
Note
must call
RabbitMQConnection.connect()
to actually connect to the broker-
close
()¶ Stop processing events and close the connection to the broker
Raises: NotImplementedError –
-
coroutine
connect
()¶ Connect to the RabbitMQ broker
-
coroutine
create_channel
()¶ create a channel for multiplexing the connection
Returns: a RabbitMQChannel
object
-
get_broker
(*, host, port)¶ Get the
Broker
object associated with the connection.Parameters: - host (str) – the hostname of the broker
- port (int) – the port of the broker
Returns: A
Broker
object
-
coroutine
process_events
(num_cycles=None)¶ Receive messages from the RabbitMQ broker and schedule associated callback couroutines.
Should be run as a task in your app and not awaited for.
Parameters: num_cycles (int|None) – the number of times to run the event processing loop. A value of None will cause events to be processed without a cycle limit.
-
class
mooq.
RabbitMQChannel
(*, internal_chan, loop)¶ Implementation of a RabbitMQ Channel
Parameters: - internal_chan – the transport specific channel object to use
- loop – the event loop
Typically this class will be instantiated outside the main thread.
-
coroutine
publish
(*, exchange_name, msg, routing_key='')¶ Publish a message on the channel.
Parameters: - exchange_name (str) – The name of the exchange to send the message to
- msg (str) – The message to send
- routing_key (str) – The routing key to associated the message with
-
coroutine
register_consumer
(queue_name=None, routing_keys=[''], *, exchange_name, exchange_type, callback)¶ Register a consumer on the RabbitMQ channel.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
- queue_name (str|None) – name of the queue. If None, a name will be given automatically and the queue will be declared exclusive to the channel, meaning it will be deleted once the channel is closed.
- callback (coroutine) – The callback to run when a message is placed on the queue that matches one of the routing keys
- routing_keys ([str,]) – A list of keys to match against. A message will only be sent to a consumer if its routing key matches one or more of the routing keys listed
-
coroutine
register_producer
(*, exchange_name, exchange_type)¶ Register a producer on the channel by providing information to the broker about the exchange the channel is going to use.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
Returns: None
-
class
mooq.
RabbitMQBroker
(*, host, port)¶ Control an existing RabbitMQBroker on your machine.
Useful when performing integration testing of projects that depend on RabbitMQ.
Add the broker to the registry. Each broker is given a unique name of “host_port” in the registry.
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
-
coroutine
close
()¶ close the broker
Raises: NotImplementedError
-
coroutine
run
(is_running=None)¶ Restarts the RabbitMQ broker using a method derived from the TEST_DISTRIBUTION environmental variable.
Parameters: is_running (future) – A future set to done once the broker is confirmed as being running If TEST_DISTRIBUTION==”arch”, will try to restart rabbitmq using the linux
systemctl
command.If TEST_DISTRIBUTION==”ubuntu”, will try to restart rabbitmq using the linux
service
command.Will wait for 20 seconds after restarting before returning.
Raises: ValueError – if TEST_DISTRIBUTION environmental variable not found Note
the user who invokes this method will likely require sudo access to the linux commands. This can be provided by editing the sudoers file.
In Memory Transport¶
-
class
mooq.
InMemoryConnection
(**kwargs)¶ Implementation of an in memory connection to a broker
Parameters: - host (str) – the hostname of the broker you wish to connect to
- port (int) – the port of the broker you wish to connect to
Note
must call
InMemoryConnection.connect()
to actually connect to the broker-
coroutine
close
()¶ Stop processing events and close the connection to the broker
-
coroutine
connect
()¶ Connect to the InMemory broker
-
coroutine
create_channel
()¶ create a channel for multiplexing the connection :returns: an
InMemoryChannel
object
-
get_broker
(*, host, port)¶ Get the
Broker
object associated with the connection.Parameters: - host (str) – the hostname of the broker
- port (int) – the port of the broker
Returns: A
Broker
object
-
coroutine
process_events
(num_cycles=None)¶ Receive messages from the broker and schedule associated callback couroutines.
Parameters: num_cycles (int|None) – the number of times to run the event processing loop. A value of None will cause events to be processed without a cycle limit.
-
class
mooq.
InMemoryChannel
(**kwargs)¶ Implementation of an in memory broker channel
Parameters: - internal_chan – the transport specific channel object to use
- loop – the event loop
Typically this class will be instantiated outside the main thread.
-
coroutine
publish
(*, exchange_name, msg, routing_key='')¶ Publish a message on the channel.
Parameters: - exchange_name (str) – The name of the exchange to send the message to
- msg (str) – The message to send
- routing_key (str) – The routing key to associated the message with
-
coroutine
register_consumer
(queue_name=None, routing_keys=[''], *, exchange_name, exchange_type, callback)¶ Register a consumer on the channel.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”
- queue_name (str|None) – name of the queue. If None, a name will be given automatically and the queue will be declared exclusive to the channel, meaning it will be deleted once the channel is closed.
- callback (coroutine) – The callback to run when a message is placed on the queue that matches one of the routing keys
- routing_keys ([str,]) – A list of keys to match against. A message will only be sent to a consumer if its routing key matches one or more of the routing keys listed
-
coroutine
register_producer
(*, exchange_name, exchange_type)¶ Register a producer on the channel by providing information to the broker about the exchange the channel is going to use.
Parameters: - exchange_name (str) – name of the exchange
- exchange_type (str) – Type of the exchange. Accepted values are “direct”, “topic” or “fanout”