diff -Nru python-aioamqp-0.11.0/aioamqp/channel.py python-aioamqp-0.12.0/aioamqp/channel.py --- python-aioamqp-0.11.0/aioamqp/channel.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/channel.py 2018-11-30 16:44:20.000000000 +0000 @@ -26,6 +26,7 @@ self.channel_id = channel_id self.consumer_queues = {} self.consumer_callbacks = {} + self.cancellation_callbacks = [] self.return_callback = return_callback self.response_future = None self.close_event = asyncio.Event(loop=self._loop) @@ -476,7 +477,6 @@ @asyncio.coroutine def basic_publish(self, payload, exchange_name, routing_key, properties=None, mandatory=False, immediate=False): - assert payload, "Payload cannot be empty" if isinstance(payload, str): warnings.warn("Str payload support will be removed in next release", DeprecationWarning) payload = payload.encode() @@ -651,6 +651,12 @@ _no_wait = frame.payload_decoder.read_bit() self.cancelled_consumers.add(consumer_tag) logger.info("consume cancelled received") + for callback in self.cancellation_callbacks: + try: + yield from callback(self, consumer_tag) + except Exception as error: # pylint: disable=broad-except + logger.error("cancellation callback %r raised exception %r", + callback, error) @asyncio.coroutine def basic_cancel(self, consumer_tag, no_wait=False): @@ -816,7 +822,6 @@ @asyncio.coroutine def publish(self, payload, exchange_name, routing_key, properties=None, mandatory=False, immediate=False): - assert payload, "Payload cannot be empty" if isinstance(payload, str): warnings.warn("Str payload support will be removed in next release", DeprecationWarning) payload = payload.encode() @@ -881,3 +886,15 @@ fut = self._get_waiter('confirm_select') fut.set_result(True) logger.debug("Confirm selected") + + def add_cancellation_callback(self, callback): + """Add a callback that is invoked when a consumer is cancelled. + + :param callback: function to call + + `callback` is called with the channel and consumer tag as positional + parameters. The callback can be either a plain callable or an + asynchronous co-routine. + + """ + self.cancellation_callbacks.append(callback) diff -Nru python-aioamqp-0.11.0/aioamqp/frame.py python-aioamqp-0.12.0/aioamqp/frame.py --- python-aioamqp-0.11.0/aioamqp/frame.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/frame.py 2018-11-30 16:44:20.000000000 +0000 @@ -92,8 +92,12 @@ self.payload.write(b'F') self.write_table(value) elif isinstance(value, int): - self.payload.write(b'I') - self.write_long(value) + if value.bit_length() >= 32: + self.payload.write(b'L') + self.write_long_long(value) + else: + self.payload.write(b'I') + self.write_long(value) elif isinstance(value, float): self.payload.write(b'd') self.write_float(value) diff -Nru python-aioamqp-0.11.0/aioamqp/tests/testcase.py python-aioamqp-0.12.0/aioamqp/tests/testcase.py --- python-aioamqp-0.11.0/aioamqp/tests/testcase.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/tests/testcase.py 2018-11-30 16:44:20.000000000 +0000 @@ -85,7 +85,7 @@ self.port = os.environ.get('AMQP_PORT', 5672) self.vhost = os.environ.get('AMQP_VHOST', self.VHOST + str(uuid.uuid4())) self.http_client = pyrabbit.api.Client( - 'localhost:15672/api/', 'guest', 'guest', timeout=20 + '{HOST}:15672/api/'.format(HOST=self.host), 'guest', 'guest', timeout=20 ) self.amqps = [] @@ -210,7 +210,7 @@ except asyncio.TimeoutError: logger.warning('Timeout on queue %s deletion', full_queue_name, exc_info=True) except Exception: # pylint: disable=broad-except - logger.error('Unexpected error on queue %s deletion', full_queue_name, exc_info=True) + logger.exception('Unexpected error on queue %s deletion', full_queue_name) @asyncio.coroutine def safe_exchange_delete(self, exchange_name, channel=None): @@ -225,7 +225,7 @@ except asyncio.TimeoutError: logger.warning('Timeout on exchange %s deletion', full_exchange_name, exc_info=True) except Exception: # pylint: disable=broad-except - logger.error('Unexpected error on exchange %s deletion', full_exchange_name, exc_info=True) + logger.exception('Unexpected error on exchange %s deletion', full_exchange_name) def full_name(self, name): if self.is_full_name(name): diff -Nru python-aioamqp-0.11.0/aioamqp/tests/test_connect.py python-aioamqp-0.12.0/aioamqp/tests/test_connect.py --- python-aioamqp-0.11.0/aioamqp/tests/test_connect.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/tests/test_connect.py 2018-11-30 16:44:20.000000000 +0000 @@ -13,7 +13,7 @@ @testing.coroutine def test_connect(self): - _transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop) + _transport, proto = yield from connect(host=self.host, port=self.port, virtualhost=self.vhost, loop=self.loop) self.assertEqual(proto.state, OPEN) self.assertIsNotNone(proto.server_properties) yield from proto.close() @@ -25,6 +25,8 @@ channel_max = 10 heartbeat = 100 _transport, proto = yield from connect( + host=self.host, + port=self.port, virtualhost=self.vhost, loop=self.loop, channel_max=channel_max, @@ -48,8 +50,8 @@ @testing.coroutine def test_socket_nodelay(self): - transport, proto = yield from connect(virtualhost=self.vhost, loop=self.loop) + transport, proto = yield from connect(host=self.host, port=self.port, virtualhost=self.vhost, loop=self.loop) sock = transport.get_extra_info('socket') opt_val = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) - self.assertEqual(opt_val, 1) + self.assertNotEqual(opt_val, 0) yield from proto.close() diff -Nru python-aioamqp-0.11.0/aioamqp/tests/test_protocol.py python-aioamqp-0.12.0/aioamqp/tests/test_protocol.py --- python-aioamqp-0.11.0/aioamqp/tests/test_protocol.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/tests/test_protocol.py 2018-11-30 16:44:20.000000000 +0000 @@ -19,7 +19,7 @@ @testing.coroutine def test_connect(self): - _transport, protocol = yield from amqp_connect(virtualhost=self.vhost, loop=self.loop) + _transport, protocol = yield from amqp_connect(host=self.host, port=self.port, virtualhost=self.vhost, loop=self.loop) self.assertEqual(protocol.state, OPEN) yield from protocol.close() @@ -30,6 +30,8 @@ 'program_version': '0.1.1', } _transport, protocol = yield from amqp_connect( + host=self.host, + port=self.port, virtualhost=self.vhost, client_properties=client_properties, loop=self.loop, @@ -41,11 +43,11 @@ @testing.coroutine def test_connection_unexistant_vhost(self): with self.assertRaises(exceptions.AmqpClosedConnection): - yield from amqp_connect(virtualhost='/unexistant', loop=self.loop) + yield from amqp_connect(host=self.host, port=self.port, virtualhost='/unexistant', loop=self.loop) def test_connection_wrong_login_password(self): with self.assertRaises(exceptions.AmqpClosedConnection): - self.loop.run_until_complete(amqp_connect(login='wrong', password='wrong', loop=self.loop)) + self.loop.run_until_complete(amqp_connect(host=self.host, port=self.port, login='wrong', password='wrong', loop=self.loop)) @testing.coroutine def test_connection_from_url(self): diff -Nru python-aioamqp-0.11.0/aioamqp/tests/test_publish.py python-aioamqp-0.12.0/aioamqp/tests/test_publish.py --- python-aioamqp-0.11.0/aioamqp/tests/test_publish.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/tests/test_publish.py 2018-11-30 16:44:20.000000000 +0000 @@ -24,6 +24,21 @@ self.assertEqual(1, queues["q"]['messages']) @testing.coroutine + def test_empty_publish(self): + # declare + yield from self.channel.queue_declare("q", exclusive=True, no_wait=False) + yield from self.channel.exchange_declare("e", "fanout") + yield from self.channel.queue_bind("q", "e", routing_key='') + + # publish + yield from self.channel.publish("", "e", routing_key='') + + queues = self.list_queues() + self.assertIn("q", queues) + self.assertEqual(1, queues["q"]["messages"]) + self.assertEqual(0, queues["q"]["message_bytes"]) + + @testing.coroutine def test_big_publish(self): # declare yield from self.channel.queue_declare("q", exclusive=True, no_wait=False) diff -Nru python-aioamqp-0.11.0/aioamqp/tests/test_queue.py python-aioamqp-0.12.0/aioamqp/tests/test_queue.py --- python-aioamqp-0.11.0/aioamqp/tests/test_queue.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/tests/test_queue.py 2018-11-30 16:44:20.000000000 +0000 @@ -51,6 +51,20 @@ self.assertEqual(result['queue'].split('.')[-1], queue_name) @testing.coroutine + def test_queue_declare_custom_x_message_ttl_32_bits(self): + queue_name = 'queue_name' + # 2147483648 == 10000000000000000000000000000000 + # in binary, meaning it is 32 bit long + x_message_ttl = 2147483648 + result = yield from self.channel.queue_declare('queue_name', arguments={ + 'x-message-ttl': x_message_ttl + }) + self.assertEqual(result['message_count'], 0) + self.assertEqual(result['consumer_count'], 0) + self.assertEqual(result['queue'].split('.')[-1], queue_name) + self.assertTrue(result) + + @testing.coroutine def test_queue_declare_passive_nonexistant_queue(self): queue_name = 'queue_name' with self.assertRaises(exceptions.ChannelClosed) as cm: diff -Nru python-aioamqp-0.11.0/aioamqp/tests/test_server_basic_cancel.py python-aioamqp-0.12.0/aioamqp/tests/test_server_basic_cancel.py --- python-aioamqp-0.11.0/aioamqp/tests/test_server_basic_cancel.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/tests/test_server_basic_cancel.py 2018-11-30 16:44:20.000000000 +0000 @@ -3,21 +3,72 @@ """ -import unittest +import asyncio +import unittest.mock +import uuid from . import testcase from . import testing +@asyncio.coroutine +def consumer(channel, body, envelope, properties): + yield from channel.basic_client_ack(envelope.delivery_tag) + + class ServerBasicCancelTestCase(testcase.RabbitTestCase, unittest.TestCase): _multiprocess_can_split_ = True + def setUp(self): + super().setUp() + self.queue_name = str(uuid.uuid4()) + @testing.coroutine def test_cancel_whilst_consuming(self): - queue_name = 'queue_name' - yield from self.channel.queue_declare(queue_name) + yield from self.channel.queue_declare(self.queue_name) # None is non-callable. We want to make sure the callback is # unregistered and never called. yield from self.channel.basic_consume(None) - yield from self.channel.queue_delete(queue_name) + yield from self.channel.queue_delete(self.queue_name) + + @testing.coroutine + def test_cancel_callbacks(self): + callback_calls = [] + + @asyncio.coroutine + def coroutine_callback(*args, **kwargs): + callback_calls.append((args, kwargs)) + + def function_callback(*args, **kwargs): + callback_calls.append((args, kwargs)) + + self.channel.add_cancellation_callback(coroutine_callback) + self.channel.add_cancellation_callback(function_callback) + + yield from self.channel.queue_declare(self.queue_name) + rv = yield from self.channel.basic_consume(consumer) + yield from self.channel.queue_delete(self.queue_name) + + self.assertEqual(2, len(callback_calls)) + for args, kwargs in callback_calls: + self.assertIs(self.channel, args[0]) + self.assertEqual(rv['consumer_tag'], args[1]) + + @testing.coroutine + def test_cancel_callback_exceptions(self): + callback_calls = [] + + def function_callback(*args, **kwargs): + callback_calls.append((args, kwargs)) + raise RuntimeError + + self.channel.add_cancellation_callback(function_callback) + self.channel.add_cancellation_callback(function_callback) + + yield from self.channel.queue_declare(self.queue_name) + yield from self.channel.basic_consume(consumer) + yield from self.channel.queue_delete(self.queue_name) + + self.assertEqual(2, len(callback_calls)) + self.assertTrue(self.channel.is_open) diff -Nru python-aioamqp-0.11.0/aioamqp/version.py python-aioamqp-0.12.0/aioamqp/version.py --- python-aioamqp-0.11.0/aioamqp/version.py 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/aioamqp/version.py 2018-11-30 16:44:20.000000000 +0000 @@ -1,2 +1,2 @@ -__version__ = '0.11.0' +__version__ = '0.12.0' __packagename__ = 'aioamqp' diff -Nru python-aioamqp-0.11.0/AUTHORS.rst python-aioamqp-0.12.0/AUTHORS.rst --- python-aioamqp-0.11.0/AUTHORS.rst 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/AUTHORS.rst 2018-11-30 16:44:20.000000000 +0000 @@ -19,4 +19,6 @@ * Alexander Gromyko * Nick Humrich * Pavel Kamaev - + * Mads Sejersen + * Dave Shawley + * Jacob Hagstedt P Suorra diff -Nru python-aioamqp-0.11.0/debian/changelog python-aioamqp-0.12.0/debian/changelog --- python-aioamqp-0.11.0/debian/changelog 2018-08-22 18:37:09.000000000 +0000 +++ python-aioamqp-0.12.0/debian/changelog 2018-12-22 21:47:38.000000000 +0000 @@ -1,3 +1,15 @@ +python-aioamqp (0.12.0-1) unstable; urgency=low + + [ Ondřej Nový ] + * Use 'python3 -m sphinx' instead of sphinx-build for building docs + + [ Michael Fladischer ] + * New upstream release. + * Add debian/gbp.conf. + * Bump Standards-Version to 4.2.1. + + -- Michael Fladischer Sat, 22 Dec 2018 22:47:38 +0100 + python-aioamqp (0.11.0-1) unstable; urgency=low * Initial release (Closes: #860010). diff -Nru python-aioamqp-0.11.0/debian/control python-aioamqp-0.12.0/debian/control --- python-aioamqp-0.11.0/debian/control 2018-08-22 18:37:09.000000000 +0000 +++ python-aioamqp-0.12.0/debian/control 2018-12-22 21:47:38.000000000 +0000 @@ -11,7 +11,7 @@ python3-setuptools, python3-sphinx, python3-sphinx-rtd-theme, -Standards-Version: 4.2.0 +Standards-Version: 4.2.1 Homepage: https://github.com/Polyconseil/aioamqp/ Vcs-Browser: https://salsa.debian.org/python-team/modules/python-aioamqp Vcs-Git: https://salsa.debian.org/python-team/modules/python-aioamqp.git diff -Nru python-aioamqp-0.11.0/debian/gbp.conf python-aioamqp-0.12.0/debian/gbp.conf --- python-aioamqp-0.11.0/debian/gbp.conf 1970-01-01 00:00:00.000000000 +0000 +++ python-aioamqp-0.12.0/debian/gbp.conf 2018-12-22 21:47:38.000000000 +0000 @@ -0,0 +1,2 @@ +[DEFAULT] +debian-branch=debian/master diff -Nru python-aioamqp-0.11.0/debian/rules python-aioamqp-0.12.0/debian/rules --- python-aioamqp-0.11.0/debian/rules 2018-08-22 18:37:09.000000000 +0000 +++ python-aioamqp-0.12.0/debian/rules 2018-12-22 21:47:38.000000000 +0000 @@ -10,7 +10,7 @@ dh $@ --with python3,sphinxdoc --buildsystem=pybuild override_dh_sphinxdoc: ifeq (,$(findstring nodoc, $(DEB_BUILD_OPTIONS))) - PYTHONPATH=. sphinx-build -b html -d docs/.build/.doctrees -N docs $(CURDIR)/debian/python-aioamqp-doc/usr/share/doc/python-aioamqp-doc/html + PYTHONPATH=. python3 -m sphinx -b html -d docs/.build/.doctrees -N docs $(CURDIR)/debian/python-aioamqp-doc/usr/share/doc/python-aioamqp-doc/html dh_sphinxdoc endif diff -Nru python-aioamqp-0.11.0/docker-compose.yaml python-aioamqp-0.12.0/docker-compose.yaml --- python-aioamqp-0.11.0/docker-compose.yaml 1970-01-01 00:00:00.000000000 +0000 +++ python-aioamqp-0.12.0/docker-compose.yaml 2018-11-30 16:44:20.000000000 +0000 @@ -0,0 +1,17 @@ +version: '3' +services: + aioamqp-test: + build: . + command: ["make", "test"] + depends_on: + - rabbitmq + environment: + - AMQP_HOST=rabbitmq + rabbitmq: + hostname: rabbitmq + image: rabbitmq:3-management + environment: + - RABBITMQ_NODENAME=my-rabbit + ports: + - 15672 + - 5672 diff -Nru python-aioamqp-0.11.0/Dockerfile python-aioamqp-0.12.0/Dockerfile --- python-aioamqp-0.11.0/Dockerfile 1970-01-01 00:00:00.000000000 +0000 +++ python-aioamqp-0.12.0/Dockerfile 2018-11-30 16:44:20.000000000 +0000 @@ -0,0 +1,7 @@ +FROM python:3.5 + +WORKDIR /usr/src/app + +COPY . . + +RUN pip install -r requirements_dev.txt diff -Nru python-aioamqp-0.11.0/docs/api.rst python-aioamqp-0.12.0/docs/api.rst --- python-aioamqp-0.11.0/docs/api.rst 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/docs/api.rst 2018-11-30 16:44:20.000000000 +0000 @@ -180,7 +180,33 @@ app_id cluster_id +Server Cancellation +~~~~~~~~~~~~~~~~~~~ +RabbitMQ offers an AMQP extension to notify a consumer when a queue is deleted. +See `Consumer Cancel Notification `_ +for additional details. ``aioamqp`` enables the extension for all channels but +takes no action when the consumer is cancelled. Your application can be notified +of consumer cancellations by adding a callback to the channel:: + + @asyncio.coroutine + def consumer_cancelled(channel, consumer_tag): + # implement required cleanup here + pass + + + @asyncio.coroutine + def consumer(channel, body, envelope, properties): + channel.basic_ack(envelope.delivery_tag) + + + channel = yield from protocol.channel() + channel.add_cancellation_callback(consumer_cancelled) + yield from channel.basic_consume(consumer, queue_name="my_queue") + +The callback can be a simple callable or an asynchronous co-routine. It can +be used to restart consumption on the channel, close the channel, or anything +else that is appropriate for your application. Queues ------ diff -Nru python-aioamqp-0.11.0/docs/changelog.rst python-aioamqp-0.12.0/docs/changelog.rst --- python-aioamqp-0.11.0/docs/changelog.rst 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/docs/changelog.rst 2018-11-30 16:44:20.000000000 +0000 @@ -4,6 +4,12 @@ Next release ------------ +Aioamqp 0.12.0 +-------------- + + * Fix an issue to use correct int encoder depending on int size (closes #180). + * Call user-specified callback when a consumer is cancelled. + Aioamqp 0.11.0 -------------- diff -Nru python-aioamqp-0.11.0/README.rst python-aioamqp-0.12.0/README.rst --- python-aioamqp-0.11.0/README.rst 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/README.rst 2018-11-30 16:44:20.000000000 +0000 @@ -30,6 +30,12 @@ Then you can run the tests with ``make test`` (requires ``nose``). +tests using docker-compose +^^^^^^^^^^^^^^^^^^^^^^^^^^ +Start RabbitMQ using ``docker-compose up -d rabbitmq``. When RabbitMQ has started, start the tests using ``docker-compose up --build aioamqp-test`` + + + .. _AMQP 0.9.1 protocol: https://www.rabbitmq.com/amqp-0-9-1-quickref.html .. _PEP 3156: http://www.python.org/dev/peps/pep-3156/ diff -Nru python-aioamqp-0.11.0/requirements_dev.txt python-aioamqp-0.12.0/requirements_dev.txt --- python-aioamqp-0.11.0/requirements_dev.txt 2018-06-20 15:33:33.000000000 +0000 +++ python-aioamqp-0.12.0/requirements_dev.txt 2018-11-30 16:44:20.000000000 +0000 @@ -3,4 +3,6 @@ nose coverage pylint +Sphinx +sphinx-rtd-theme -e git+https://github.com/bkjones/pyrabbit.git#egg=pyrabbit