diff -Nru python-persist-queue-0.4.0/appveyor.yml python-persist-queue-0.5.1/appveyor.yml --- python-persist-queue-0.4.0/appveyor.yml 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/appveyor.yml 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,50 @@ +environment: + + matrix: + + # For Python versions available on Appveyor, see + # http://www.appveyor.com/docs/installed-software#python + # The list here is complete (excluding Python 2.6, which + # isn't covered by this document) at the time of writing. + - TOXENV: "pep8" + PYTHON: "C:\\Python27-x64" + DISTUTILS_USE_SDK: "1" + - TOXENV: "py27" + PYTHON: "C:\\Python27-x64" + DISTUTILS_USE_SDK: "1" + - TOXENV: "py34" + PYTHON: "C:\\Python34-x64" + DISTUTILS_USE_SDK: "1" + - TOXENV: "py35" + PYTHON: "C:\\Python35-x64" + DISTUTILS_USE_SDK: "1" + - TOXENV: "py36" + PYTHON: "C:\\Python36-x64" + DISTUTILS_USE_SDK: "1" + - TOXENV: "py37" + PYTHON: "C:\\Python37-x64" + DISTUTILS_USE_SDK: "1" + - TOXENV: "cover" + PYTHON: "C:\\Python36-x64" + DISTUTILS_USE_SDK: "1" + +install: + # We need wheel installed to build wheels + - "%PYTHON%\\python.exe -m pip install tox" + +build: off + +test_script: + # Put your test command here. + # If you don't need to build C extensions on 64-bit Python 3.3 or 3.4, + # you can remove "build.cmd" from the front of the command, as it's + # only needed to support those cases. + # Note that you must use the environment variable %PYTHON% to refer to + # the interpreter you're using - Appveyor does not do anything special + # to put the Python evrsion you want to use on PATH. + - "%PYTHON%\\Scripts\\tox.exe" + +#on_success: +# You can use this step to upload your artifacts to a public website. +# See Appveyor's documentation for more details. Or you can simply +# access your wheels from the Appveyor "artifacts" tab for your build. diff -Nru python-persist-queue-0.4.0/benchmark/run_benchmark.py python-persist-queue-0.5.1/benchmark/run_benchmark.py --- python-persist-queue-0.4.0/benchmark/run_benchmark.py 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/benchmark/run_benchmark.py 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,151 @@ +"""This file provides tests to benchmark performance sqlite/file queue +on specific hardware. User can easily evaluate the performance by running this +file directly via `python run_benchmark.py` +""" +from persistqueue import SQLiteQueue +from persistqueue import Queue +import tempfile +import time + +BENCHMARK_COUNT = 100 + + +def time_it(func): + def _exec(*args, **kwargs): + start = time.time() + func(*args, **kwargs) + end = time.time() + print( + "\t{} => time used: {:.4f} seconds.".format( + func.__doc__, + (end - start))) + + return _exec + + +class FileQueueBench(object): + """Benchmark File queue performance.""" + + def __init__(self, prefix=None): + self.path = prefix + @time_it + def benchmark_file_write(self): + """Writing items.""" + + self.path = tempfile.mkdtemp('b_file_10000') + q = Queue(self.path) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + assert q.qsize() == BENCHMARK_COUNT + + @time_it + def benchmark_file_read_write_false(self): + """Writing and reading items(1 task_done).""" + + self.path = tempfile.mkdtemp('b_file_10000') + q = Queue(self.path) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + + for i in range(BENCHMARK_COUNT): + q.get() + q.task_done() + assert q.qsize() == 0 + + @time_it + def benchmark_file_read_write_autosave(self): + """Writing and reading items(autosave).""" + + self.path = tempfile.mkdtemp('b_file_10000') + q = Queue(self.path, autosave=True) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + + for i in range(BENCHMARK_COUNT): + q.get() + assert q.qsize() == 0 + + @time_it + def benchmark_file_read_write_true(self): + """Writing and reading items(many task_done).""" + + self.path = tempfile.mkdtemp('b_file_10000') + q = Queue(self.path) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + + for i in range(BENCHMARK_COUNT): + q.get() + q.task_done() + assert q.qsize() == 0 + + @classmethod + def run(cls): + print(cls.__doc__) + ins = cls() + for name in sorted(cls.__dict__): + if name.startswith('benchmark'): + func = getattr(ins, name) + func() + + +class Sqlite3QueueBench(object): + """Benchmark Sqlite3 queue performance.""" + + @time_it + def benchmark_sqlite_write(self): + """Writing items.""" + + self.path = tempfile.mkdtemp('b_sql_10000') + q = SQLiteQueue(self.path, auto_commit=False) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + + assert q.qsize() == BENCHMARK_COUNT + + @time_it + def benchmark_sqlite_read_write_false(self): + """Writing and reading items(1 task_done).""" + self.path = tempfile.mkdtemp('b_sql_10000') + q = SQLiteQueue(self.path, auto_commit=False) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + for i in range(BENCHMARK_COUNT): + q.get() + q.task_done() + assert q.qsize() == 0 + + @time_it + def benchmark_sqlite_read_write_true(self): + """Writing and reading items(many task_done).""" + self.path = tempfile.mkdtemp('b_sql_10000') + q = SQLiteQueue(self.path, auto_commit=True) + for i in range(BENCHMARK_COUNT): + q.put('bench%d' % i) + + for i in range(BENCHMARK_COUNT): + q.get() + q.task_done() + assert q.qsize() == 0 + + @classmethod + def run(cls): + print(cls.__doc__) + ins = cls() + for name in sorted(cls.__dict__): + + if name.startswith('benchmark'): + func = getattr(ins, name) + func() + + +if __name__ == '__main__': + import sys + + if len(sys.argv) > 1: + BENCHMARK_COUNT = int(sys.argv[1]) + print(" = {}".format(BENCHMARK_COUNT)) + file_bench = FileQueueBench() + file_bench.run() + sql_bench = Sqlite3QueueBench() + sql_bench.run() diff -Nru python-persist-queue-0.4.0/.circleci/config.yml python-persist-queue-0.5.1/.circleci/config.yml --- python-persist-queue-0.4.0/.circleci/config.yml 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/.circleci/config.yml 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,95 @@ +version: 2 +jobs: + py27: + docker: + # Primary container image where all steps run. + - image: circleci/python:2.7.15 + environment: + - TOXENV: py27 + steps: &common_steps + - checkout + - run: + command: | + sudo pip install tox + - run: + command: | # tell the operating system to remove the file size limit on core dump files + ulimit -c unlimited + tox + - run: bash <(curl -s https://codecov.io/bash) -cF python + - run: + command: | + mkdir -p /tmp/core_dumps + cp core.* /tmp/core_dumps + when: on_fail + - store_artifacts: + # collect core dumps + path: /tmp/core_dumps + - store_artifacts: + path: .coverage + - store_artifacts: + path: coverage.xml + - store_artifacts: + path: htmlcov + + py34: + docker: + # Primary container image where all steps run. + - image: circleci/python:3.4.7 + environment: + - TOXENV: py34 + steps: *common_steps + + py35: + docker: + # Primary container image where all steps run. + - image: circleci/python:3.5.5 + environment: + - TOXENV: py35 + + steps: *common_steps + + py36: + docker: + # Primary container image where all steps run. + - image: circleci/python:3.6.5 + environment: + - TOXENV: py36 + steps: *common_steps + + py37: + docker: + # Primary container image where all steps run. + - image: circleci/python:3.7.0 + environment: + - TOXENV: py37 + steps: *common_steps + + + pep8: + docker: + # Primary container image where all steps run. + - image: circleci/python:3.5.4 + environment: + - TOXENV: pep8 + steps: *common_steps + + + cover: + docker: + # Primary container image where all steps run. + - image: circleci/python:3.5.4 + environment: + - TOXENV: cover + steps: *common_steps + +workflows: + version: 2 + build: + jobs: + - pep8 + - py27 + - py34 + - py35 + - py36 + - py37 + - cover diff -Nru python-persist-queue-0.4.0/.coveragerc python-persist-queue-0.5.1/.coveragerc --- python-persist-queue-0.4.0/.coveragerc 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/.coveragerc 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,9 @@ +[run] +branch = True +source = persistqueue/* +omit = + ./tests/* + ./.tox/* + ./setup.py +[xml] +output = coverage.xml diff -Nru python-persist-queue-0.4.0/debian/changelog python-persist-queue-0.5.1/debian/changelog --- python-persist-queue-0.4.0/debian/changelog 2018-09-06 09:33:45.000000000 +0000 +++ python-persist-queue-0.5.1/debian/changelog 2021-01-02 00:08:37.000000000 +0000 @@ -1,3 +1,22 @@ +python-persist-queue (0.5.1-1) unstable; urgency=medium + + * New upstream version. + + [ Ondřej Nový ] + * Use debhelper-compat instead of debian/compat. + * d/control: Update Maintainer field with new Debian Python Team + contact address. + * d/control: Update Vcs-* fields with new Debian Python Team Salsa + layout. + + [ Debian Janitor ] + * Bump debhelper from old 11 to 12. + * Set upstream metadata fields: Bug-Database, Bug-Submit, Repository, + Repository-Browse. + * Add debian/watch file, using pypi. + + -- Martin Sat, 02 Jan 2021 00:08:37 +0000 + python-persist-queue (0.4.0-1) unstable; urgency=medium * Initial package (Closes: #907971) diff -Nru python-persist-queue-0.4.0/debian/compat python-persist-queue-0.5.1/debian/compat --- python-persist-queue-0.4.0/debian/compat 2018-09-05 20:13:59.000000000 +0000 +++ python-persist-queue-0.5.1/debian/compat 1970-01-01 00:00:00.000000000 +0000 @@ -1 +0,0 @@ -11 diff -Nru python-persist-queue-0.4.0/debian/control python-persist-queue-0.5.1/debian/control --- python-persist-queue-0.4.0/debian/control 2018-09-06 09:33:45.000000000 +0000 +++ python-persist-queue-0.5.1/debian/control 2021-01-02 00:08:37.000000000 +0000 @@ -1,18 +1,20 @@ Source: python-persist-queue -Maintainer: Debian Python Modules Team -Uploaders: W. Martin Borgert +Maintainer: Debian Python Team +Uploaders: Martin Section: python Priority: optional -Build-Depends: debhelper (>= 11), +Build-Depends: debhelper-compat (= 13), dh-python, python3-all, python3-mock, + python3-msgpack, + python3-nose2, python3-setuptools, Standards-Version: 4.2.1 Rules-Requires-Root: no Homepage: https://github.com/peter-wangxu/persist-queue -Vcs-Git: https://salsa.debian.org/python-team/modules/python-persist-queue.git -Vcs-Browser: https://salsa.debian.org/python-team/modules/python-persist-queue +Vcs-Git: https://salsa.debian.org/python-team/packages/python-persist-queue.git +Vcs-Browser: https://salsa.debian.org/python-team/packages/python-persist-queue Package: python3-persist-queue Architecture: all diff -Nru python-persist-queue-0.4.0/debian/patches/series python-persist-queue-0.5.1/debian/patches/series --- python-persist-queue-0.4.0/debian/patches/series 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/debian/patches/series 2021-01-02 00:08:37.000000000 +0000 @@ -0,0 +1 @@ +skip-unittest.patch diff -Nru python-persist-queue-0.4.0/debian/patches/skip-unittest.patch python-persist-queue-0.5.1/debian/patches/skip-unittest.patch --- python-persist-queue-0.4.0/debian/patches/skip-unittest.patch 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/debian/patches/skip-unittest.patch 2021-01-02 00:08:37.000000000 +0000 @@ -0,0 +1,16 @@ +Description: disable unittest +Author: Martin +Origin: vendor +Last-Update: 2021-01-02 +--- +This patch header follows DEP-3: http://dep.debian.net/deps/dep3/ +--- a/persistqueue/tests/test_queue.py ++++ b/persistqueue/tests/test_queue.py +@@ -220,6 +220,7 @@ + with self.assertRaises(ValueError): + q.put('var1', timeout=-1) + ++ @unittest.skip("disable unittest") + @params(*serializer_params) + def test_put_block_and_wait(self, serializer): + """Test block until queue is not full.""" diff -Nru python-persist-queue-0.4.0/debian/upstream/metadata python-persist-queue-0.5.1/debian/upstream/metadata --- python-persist-queue-0.4.0/debian/upstream/metadata 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/debian/upstream/metadata 2021-01-02 00:08:28.000000000 +0000 @@ -0,0 +1,4 @@ +Bug-Database: https://github.com/peter-wangxu/persist-queue/issues +Bug-Submit: https://github.com/peter-wangxu/persist-queue/issues/new +Repository: https://github.com/peter-wangxu/persist-queue.git +Repository-Browse: https://github.com/peter-wangxu/persist-queue diff -Nru python-persist-queue-0.4.0/debian/watch python-persist-queue-0.5.1/debian/watch --- python-persist-queue-0.4.0/debian/watch 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/debian/watch 2021-01-02 00:08:28.000000000 +0000 @@ -0,0 +1,2 @@ +version=4 +https://pypi.debian.net/persist-queue/persist-queue-(.+)\.(?:zip|tgz|tbz|txz|(?:tar\.(?:gz|bz2|xz))) diff -Nru python-persist-queue-0.4.0/extra-requirements.txt python-persist-queue-0.5.1/extra-requirements.txt --- python-persist-queue-0.4.0/extra-requirements.txt 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/extra-requirements.txt 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1 @@ +msgpack>=0.5.6 \ No newline at end of file diff -Nru python-persist-queue-0.4.0/.gitignore python-persist-queue-0.5.1/.gitignore --- python-persist-queue-0.4.0/.gitignore 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/.gitignore 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,92 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg +.testrepository/ +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +# IDE specific folders +.idea/ diff -Nru python-persist-queue-0.4.0/persistqueue/__init__.py python-persist-queue-0.5.1/persistqueue/__init__.py --- python-persist-queue-0.4.0/persistqueue/__init__.py 2018-06-17 12:12:00.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/__init__.py 2020-11-09 05:35:13.000000000 +0000 @@ -1,14 +1,22 @@ # coding=utf-8 __author__ = 'Peter Wang' __license__ = 'BSD' -__version__ = '0.4.0' +__version__ = '0.5.1' from .exceptions import Empty, Full # noqa -from .pdict import PDict # noqa from .queue import Queue # noqa -from .sqlqueue import SQLiteQueue, FIFOSQLiteQueue, FILOSQLiteQueue, UniqueQ # noqa -from .sqlackqueue import SQLiteAckQueue + +try: + from .pdict import PDict # noqa + from .sqlqueue import SQLiteQueue, FIFOSQLiteQueue, FILOSQLiteQueue, \ + UniqueQ # noqa + from .sqlackqueue import SQLiteAckQueue, UniqueAckQ +except ImportError: + import logging + + log = logging.getLogger(__name__) + log.info("No sqlite3 module found, sqlite3 based queues are not available") __all__ = ["Queue", "SQLiteQueue", "FIFOSQLiteQueue", "FILOSQLiteQueue", - "UniqueQ", "PDict", "SQLiteAckQueue", "Empty", "Full", + "UniqueQ", "PDict", "SQLiteAckQueue", "UniqueAckQ", "Empty", "Full", "__author__", "__license__", "__version__"] diff -Nru python-persist-queue-0.4.0/persistqueue/pdict.py python-persist-queue-0.5.1/persistqueue/pdict.py --- python-persist-queue-0.4.0/persistqueue/pdict.py 2018-05-19 06:56:15.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/pdict.py 2020-11-09 05:35:13.000000000 +0000 @@ -1,6 +1,5 @@ #! coding = utf-8 import logging -import pickle import sqlite3 from persistqueue import sqlbase @@ -50,7 +49,7 @@ return row is not None def __setitem__(self, key, value): - obj = pickle.dumps(value) + obj = self._serializer.dumps(value) try: self._insert_into(key, obj) except sqlite3.IntegrityError: @@ -59,7 +58,7 @@ def __getitem__(self, item): row = self._select(item) if row: - return pickle.loads(row[1]) + return self._serializer.loads(row[1]) else: raise KeyError('Key: {} not exists.'.format(item)) diff -Nru python-persist-queue-0.4.0/persistqueue/queue.py python-persist-queue-0.5.1/persistqueue/queue.py --- python-persist-queue-0.4.0/persistqueue/queue.py 2018-05-19 06:56:15.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/queue.py 2020-11-09 05:35:13.000000000 +0000 @@ -4,14 +4,13 @@ import logging import os -import pickle import tempfile import threading from time import time as _time -from persistqueue.exceptions import Empty, Full -from persistqueue import common +import persistqueue.serializers.pickle +from persistqueue.exceptions import Empty, Full log = logging.getLogger(__name__) @@ -21,8 +20,44 @@ f.truncate(length) +def atomic_rename(src, dst): + try: + os.replace(src, dst) + except AttributeError: # python < 3.3 + import sys + + if sys.platform == 'win32': + import ctypes + + if sys.version_info[0] == 2: + _str = unicode # noqa + _bytes = str + else: + _str = str + _bytes = bytes + + if isinstance(src, _str) and isinstance(dst, _str): + MoveFileEx = ctypes.windll.kernel32.MoveFileExW + elif isinstance(src, _bytes) and isinstance(dst, _bytes): + MoveFileEx = ctypes.windll.kernel32.MoveFileExA + else: + raise ValueError("Both args must be bytes or unicode.") + + MOVEFILE_REPLACE_EXISTING = 0x1 + + if not MoveFileEx(src, dst, MOVEFILE_REPLACE_EXISTING): + errno = ctypes.GetLastError() + strerror = os.strerror(errno) + raise WindowsError(errno, strerror) + + else: + os.rename(src, dst) + + class Queue(object): - def __init__(self, path, maxsize=0, chunksize=100, tempdir=None): + def __init__(self, path, maxsize=0, chunksize=100, tempdir=None, + serializer=persistqueue.serializers.pickle, + autosave=False): """Create a persistent queue object on a given path. The argument path indicates a directory where enqueued data should be @@ -34,18 +69,42 @@ The tempdir parameter indicates where temporary files should be stored. The tempdir has to be located on the same disk as the enqueued data in order to obtain atomic operations. + + The serializer parameter controls how enqueued data is serialized. It + must have methods dump(value, fp) and load(fp). The dump method must + serialize value and write it to fp, and may be called for multiple + values with the same fp. The load method must deserialize and return + one value from fp, and may be called multiple times with the same fp + to read multiple values. + + The autosave parameter controls when data removed from the queue is + persisted. By default (disabled), the change is only persisted when + task_done() is called. If autosave is enabled, data is persisted + immediately when get() is called. Adding data to the queue with put() + will always persist immediately regardless of this setting. """ log.debug('Initializing File based Queue with path {}'.format(path)) self.path = path self.chunksize = chunksize self.tempdir = tempdir self.maxsize = maxsize - self.protocol = None + self.serializer = serializer + self.autosave = autosave self._init(maxsize) if self.tempdir: if os.stat(self.path).st_dev != os.stat(self.tempdir).st_dev: raise ValueError("tempdir has to be located " "on same path filesystem") + else: + _, tempdir = tempfile.mkstemp() + if os.stat(self.path).st_dev != os.stat(tempdir).st_dev: + self.tempdir = self.path + log.warning("Default tempdir '%(dft_dir)s' is not on the " + "same filesystem with queue path '%(queue_path)s'" + ",defaulting to '%(new_path)s'." % { + "dft_dir": tempdir, + "queue_path": self.path, + "new_path": self.tempdir}) self.info = self._loadinfo() # truncate head case it contains garbage hnum, hcnt, hoffset = self.info['head'] @@ -72,7 +131,6 @@ if not os.path.exists(self.path): os.makedirs(self.path) - self.protocol = common.select_pickle_protocol() def join(self): with self.all_tasks_done: @@ -88,6 +146,9 @@ def _qsize(self): return self.info['size'] + def empty(self): + return self.qsize() == 0 + def put(self, item, block=True, timeout=None): "Interface for putting item in disk-based queue." self.not_full.acquire() @@ -115,13 +176,16 @@ self.not_full.release() def _put(self, item): - pickle.dump(item, self.headf) + self.serializer.dump(item, self.headf) self.headf.flush() hnum, hpos, _ = self.info['head'] hpos += 1 if hpos == self.info['chunksize']: hpos = 0 hnum += 1 + # make sure data is written to disk whatever + # its underlying file system + os.fsync(self.headf.fileno()) self.headf.close() self.headf = self._openchunk(hnum, 'ab+') self.info['size'] += 1 @@ -163,7 +227,7 @@ hnum, hcnt, _ = self.info['head'] if [tnum, tcnt] >= [hnum, hcnt]: return None - data = pickle.load(self.tailf) + data = self.serializer.load(self.tailf) toffset = self.tailf.tell() tcnt += 1 if tcnt == self.info['chunksize'] and tnum <= hnum: @@ -173,7 +237,11 @@ self.tailf = self._openchunk(tnum) self.info['size'] -= 1 self.info['tail'] = [tnum, tcnt, toffset] - self.update_info = True + if self.autosave: + self._saveinfo() + self.update_info = False + else: + self.update_info = True return data def task_done(self): @@ -187,6 +255,8 @@ self._task_done() def _task_done(self): + if self.autosave: + return if self.update_info: self._saveinfo() self.update_info = False @@ -198,7 +268,7 @@ infopath = self._infopath() if os.path.exists(infopath): with open(infopath, 'rb') as f: - info = pickle.load(f) + info = self.serializer.load(f) else: info = { 'chunksize': self.chunksize, @@ -216,17 +286,9 @@ def _saveinfo(self): tmpfd, tmpfn = self._gettempfile() - os.write(tmpfd, pickle.dumps(self.info)) - os.close(tmpfd) - # POSIX requires that 'rename' is an atomic operation - try: - os.rename(tmpfn, self._infopath()) - except OSError as e: - if getattr(e, 'winerror', None) == 183: - os.remove(self._infopath()) - os.rename(tmpfn, self._infopath()) - else: - raise + with os.fdopen(tmpfd, "wb") as tmpfo: + self.serializer.dump(self.info, tmpfo) + atomic_rename(tmpfn, self._infopath()) self._clear_tail_file() def _clear_tail_file(self): @@ -245,3 +307,9 @@ def _infopath(self): return os.path.join(self.path, 'info') + + def __del__(self): + """Handles the removal of queue.""" + for to_close in [self.headf, self.tailf]: + if to_close and not to_close.closed: + to_close.close() diff -Nru python-persist-queue-0.4.0/persistqueue/serializers/__init__.py python-persist-queue-0.5.1/persistqueue/serializers/__init__.py --- python-persist-queue-0.4.0/persistqueue/serializers/__init__.py 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/serializers/__init__.py 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1 @@ +#! coding = utf-8 diff -Nru python-persist-queue-0.4.0/persistqueue/serializers/json.py python-persist-queue-0.5.1/persistqueue/serializers/json.py --- python-persist-queue-0.4.0/persistqueue/serializers/json.py 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/serializers/json.py 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,30 @@ +#! coding = utf-8 + +""" +A serializer that extends json to use bytes and uses newlines to store +multiple objects per file. +""" + +from __future__ import absolute_import +import json + + +def dump(value, fp, sort_keys=False): + "Serialize value as json line to a byte-mode file object" + fp.write(json.dumps(value, sort_keys=sort_keys).encode()) + fp.write(b"\n") + + +def dumps(value, sort_keys=False): + "Serialize value as json to bytes" + return json.dumps(value, sort_keys=sort_keys).encode() + + +def load(fp): + "Deserialize one json line from a byte-mode file object" + return json.loads(fp.readline().decode()) + + +def loads(bytes_value): + "Deserialize one json value from bytes" + return json.loads(bytes_value.decode()) diff -Nru python-persist-queue-0.4.0/persistqueue/serializers/msgpack.py python-persist-queue-0.5.1/persistqueue/serializers/msgpack.py --- python-persist-queue-0.4.0/persistqueue/serializers/msgpack.py 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/serializers/msgpack.py 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,38 @@ +#! coding = utf-8 + +""" +A serializer that extends msgpack to specify recommended parameters and adds a +4 byte length prefix to store multiple objects per file. +""" + +from __future__ import absolute_import +import msgpack +import struct + + +def dump(value, fp, sort_keys=False): + "Serialize value as msgpack to a byte-mode file object" + if sort_keys and isinstance(value, dict): + value = {key: value[key] for key in sorted(value)} + packed = msgpack.packb(value, use_bin_type=True) + length = struct.pack(" 0: + q.get_nowait() + q.task_done() + n -= 1 + else: + with self.assertRaises(Empty): + q.get_nowait() + else: + q.put('var%d' % random.getrandbits(16)) + n += 1 + + @params(*serializer_params) + def test_multi_threaded(self, serializer): + """Create consumer and producer threads, check parallelism""" + + q = Queue(self.path, **serializer_params[serializer]) + + def producer(): + for i in range(1000): + q.put('var%d' % i) + + def consumer(): + for i in range(1000): + q.get() + q.task_done() + + c = Thread(target=consumer) + c.start() + p = Thread(target=producer) + p.start() + c.join() + p.join() + + q.join() + with self.assertRaises(Empty): + q.get_nowait() + + @params(*serializer_params) + def test_garbage_on_head(self, serializer): + """Adds garbage to the queue head and let the internal integrity + checks fix it""" + + q = Queue(self.path, **serializer_params[serializer]) + q.put('var1') + del q + + with open(os.path.join(self.path, 'q00000'), 'ab') as f: + f.write(b'garbage') + q = Queue(self.path, **serializer_params[serializer]) + q.put('var2') + + self.assertEqual(2, q.qsize()) + self.assertEqual('var1', q.get()) + q.task_done() + + @params(*serializer_params) + def test_task_done_too_many_times(self, serializer): + """Test too many task_done called.""" + q = Queue(self.path, **serializer_params[serializer]) + q.put('var1') + q.get() + q.task_done() + + with self.assertRaises(ValueError): + q.task_done() + + @params(*serializer_params) + def test_get_timeout_negative(self, serializer): + q = Queue(self.path, **serializer_params[serializer]) + q.put('var1') + with self.assertRaises(ValueError): + q.get(timeout=-1) + + @params(*serializer_params) + def test_get_timeout(self, serializer): + """Test when get failed within timeout.""" + q = Queue(self.path, **serializer_params[serializer]) + q.put('var1') + q.get() + with self.assertRaises(Empty): + q.get(timeout=1) + + @params(*serializer_params) + def test_put_nowait(self, serializer): + """Tests the put_nowait interface.""" + q = Queue(self.path, **serializer_params[serializer]) + q.put_nowait('var1') + self.assertEqual('var1', q.get()) + q.task_done() + + @params(*serializer_params) + def test_put_maxsize_reached(self, serializer): + """Test that maxsize reached.""" + q = Queue(self.path, maxsize=10, **serializer_params[serializer]) + for x in range(10): + q.put(x) + + with self.assertRaises(Full): + q.put('full_now', block=False) + + @params(*serializer_params) + def test_put_timeout_reached(self, serializer): + """Test put with block and timeout.""" + q = Queue(self.path, maxsize=2, **serializer_params[serializer]) + for x in range(2): + q.put(x) + + with self.assertRaises(Full): + q.put('full_and_timeout', block=True, timeout=1) + + @params(*serializer_params) + def test_put_timeout_negative(self, serializer): + """Test and put with timeout < 0""" + q = Queue(self.path, maxsize=1, **serializer_params[serializer]) + with self.assertRaises(ValueError): + q.put('var1', timeout=-1) + + @params(*serializer_params) + def test_put_block_and_wait(self, serializer): + """Test block until queue is not full.""" + q = Queue(self.path, maxsize=10, **serializer_params[serializer]) + + def consumer(): + for i in range(5): + q.get() + q.task_done() + + def producer(): + for j in range(16): + q.put('var%d' % j) + + p = Thread(target=producer) + p.start() + c = Thread(target=consumer) + c.start() + c.join() + val = q.get_nowait() + p.join() + self.assertEqual('var5', val) + + @params(*serializer_params) + def test_clear_tail_file(self, serializer): + """Test that only remove tail file when calling task_done.""" + q = Queue(self.path, chunksize=10, **serializer_params[serializer]) + for i in range(35): + q.put('var%d' % i) + + for _ in range(15): + q.get() + + q = Queue(self.path, chunksize=10, **serializer_params[serializer]) + self.assertEqual(q.qsize(), 35) + + for _ in range(15): + q.get() + # the first tail file gets removed after task_done + q.task_done() + for _ in range(16): + q.get() + # the second and third files get removed after task_done + q.task_done() + self.assertEqual(q.qsize(), 4) + + def test_protocol(self): + # test that protocol is set properly + expect_protocol = 2 if sys.version_info[0] == 2 else 4 + self.assertEqual( + serializers_pickle.protocol, + expect_protocol, + ) + + # test that protocol is used properly + serializer = namedtuple("Serializer", ["dump", "load"])( + serializers_pickle.dump, lambda fp: fp.read()) + + q = Queue(path=self.path, serializer=serializer) + q.put(b'a') + self.assertEqual(q.get(), pickle.dumps(b'a', protocol=expect_protocol)) + + @params(*serializer_params) + def test_del(self, serializer): + """test that __del__ can be called successfully""" + q = Queue(self.path, **serializer_params[serializer]) + q.__del__() + self.assertTrue(q.headf.closed) + self.assertTrue(q.tailf.closed) + + @params(*serializer_params) + def test_autosave_get(self, serializer): + """test the autosave feature saves on get()""" + q = Queue(self.path, autosave=True, **serializer_params[serializer]) + q.put('var1') + q.put('var2') + self.assertEqual('var1', q.get()) + del q + # queue should save on get(), only one item should remain + q = Queue(self.path, autosave=True, **serializer_params[serializer]) + self.assertEqual(1, q.qsize()) + self.assertEqual('var2', q.get()) + del q + + @params(*serializer_params) + def test_autosave_join(self, serializer): + """Enabling autosave should still allow task_done/join behavior""" + q = Queue(self.path, autosave=True, **serializer_params[serializer]) + for i in range(10): + q.put('var%d' % i) + + def consumer(): + for i in range(10): + q.get() + # this should still 'count down' properly and allow q.join() + # to finish + q.task_done() + + c = Thread(target=consumer) + c.start() + q.join() + with self.assertRaises(Empty): + q.get_nowait() diff -Nru python-persist-queue-0.4.0/persistqueue/tests/test_sqlackqueue.py python-persist-queue-0.5.1/persistqueue/tests/test_sqlackqueue.py --- python-persist-queue-0.4.0/persistqueue/tests/test_sqlackqueue.py 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/tests/test_sqlackqueue.py 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,372 @@ +# coding=utf-8 + +import random +import shutil +import sys +import tempfile +import unittest +from threading import Thread + +from persistqueue.sqlackqueue import ( + SQLiteAckQueue, + FILOSQLiteAckQueue, + UniqueAckQ) +from persistqueue import Empty + + +class SQLite3AckQueueTest(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='sqlackqueue') + self.auto_commit = True + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def test_raise_empty(self): + q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) + + q.put('first') + d = q.get() + self.assertEqual('first', d) + self.assertRaises(Empty, q.get, block=False) + + # assert with timeout + self.assertRaises(Empty, q.get, block=True, timeout=1.0) + # assert with negative timeout + self.assertRaises(ValueError, q.get, block=True, timeout=-1.0) + + def test_empty(self): + q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) + self.assertEqual(q.empty(), True) + + q.put('first') + self.assertEqual(q.empty(), False) + + q.get() + self.assertEqual(q.empty(), True) + + def test_open_close_single(self): + """Write 1 item, close, reopen checking if same item is there""" + + q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) + q.put(b'var1') + del q + q = SQLiteAckQueue(self.path) + self.assertEqual(1, q.qsize()) + self.assertEqual(b'var1', q.get()) + + def test_open_close_1000(self): + """Write 1000 items, close, reopen checking if all items are there""" + + q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) + for i in range(1000): + q.put('var%d' % i) + + self.assertEqual(1000, q.qsize()) + del q + q = SQLiteAckQueue(self.path) + self.assertEqual(1000, q.qsize()) + for i in range(1000): + data = q.get() + self.assertEqual('var%d' % i, data) + # assert adding another one still works + q.put('foobar') + data = q.get() + self.assertEqual('foobar', data) + + def test_random_read_write(self): + """Test random read/write""" + + q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) + n = 0 + for _ in range(1000): + if random.random() < 0.5: + if n > 0: + q.get() + n -= 1 + else: + self.assertRaises(Empty, q.get, block=False) + else: + q.put('var%d' % random.getrandbits(16)) + n += 1 + + def test_multi_threaded_parallel(self): + """Create consumer and producer threads, check parallelism""" + + # self.skipTest("Not supported multi-thread.") + + m_queue = SQLiteAckQueue( + path=self.path, multithreading=True, + auto_commit=self.auto_commit + ) + + def producer(): + for i in range(1000): + m_queue.put('var%d' % i) + + def consumer(): + for i in range(1000): + x = m_queue.get(block=True) + self.assertEqual('var%d' % i, x) + + c = Thread(target=consumer) + c.start() + p = Thread(target=producer) + p.start() + p.join() + c.join() + self.assertEqual(0, m_queue.size) + self.assertEqual(0, len(m_queue)) + self.assertRaises(Empty, m_queue.get, block=False) + + def test_multi_threaded_multi_producer(self): + """Test sqlqueue can be used by multiple producers.""" + queue = SQLiteAckQueue( + path=self.path, multithreading=True, + auto_commit=self.auto_commit + ) + + def producer(seq): + for i in range(10): + queue.put('var%d' % (i + (seq * 10))) + + def consumer(): + for _ in range(100): + data = queue.get(block=True) + self.assertTrue('var' in data) + + c = Thread(target=consumer) + c.start() + producers = [] + for seq in range(10): + t = Thread(target=producer, args=(seq,)) + t.start() + producers.append(t) + + for t in producers: + t.join() + + c.join() + + def test_multiple_consumers(self): + """Test sqlqueue can be used by multiple consumers.""" + + queue = SQLiteAckQueue( + path=self.path, multithreading=True, + auto_commit=self.auto_commit + ) + + def producer(): + for x in range(1000): + queue.put('var%d' % x) + + counter = [] + # Set all to 0 + for _ in range(1000): + counter.append(0) + + def consumer(index): + for i in range(200): + data = queue.get(block=True) + self.assertTrue('var' in data) + counter[index * 200 + i] = data + + p = Thread(target=producer) + p.start() + consumers = [] + for index in range(5): + t = Thread(target=consumer, args=(index,)) + t.start() + consumers.append(t) + + p.join() + for t in consumers: + t.join() + + self.assertEqual(0, queue.qsize()) + for x in range(1000): + self.assertNotEqual(0, counter[x], + "not 0 for counter's index %s" % x) + + def test_protocol_1(self): + shutil.rmtree(self.path, ignore_errors=True) + q = SQLiteAckQueue(path=self.path) + self.assertEqual(q._serializer.protocol, + 2 if sys.version_info[0] == 2 else 4) + + def test_protocol_2(self): + q = SQLiteAckQueue(path=self.path) + self.assertEqual(q._serializer.protocol, + 2 if sys.version_info[0] == 2 else 4) + + def test_ack_and_clear(self): + q = SQLiteAckQueue(path=self.path) + q._MAX_ACKED_LENGTH = 10 + ret_list = [] + for _ in range(100): + q.put("val%s" % _) + for _ in range(100): + ret_list.append(q.get()) + for ret in ret_list: + q.ack(ret) + self.assertEqual(q.acked_count(), 100) + q.clear_acked_data() + self.assertEqual(q.acked_count(), 10) + q.shrink_disk_usage() + + def test_ack_unknown_item(self): + q = SQLiteAckQueue(path=self.path) + q.put("val1") + val1 = q.get() + q.ack("val2") + q.nack("val3") + q.ack_failed("val4") + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.unack_count(), 1) + q.ack(val1) + self.assertEqual(q.unack_count(), 0) + + def test_resume_unack(self): + q = SQLiteAckQueue(path=self.path) + q.put("val1") + val1 = q.get() + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.unack_count(), 1) + self.assertEqual(q.ready_count(), 0) + del q + + q = SQLiteAckQueue(path=self.path, auto_resume=False) + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.unack_count(), 1) + self.assertEqual(q.ready_count(), 0) + q.resume_unack_tasks() + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.unack_count(), 0) + self.assertEqual(q.ready_count(), 1) + self.assertEqual(val1, q.get()) + del q + + q = SQLiteAckQueue(path=self.path, auto_resume=True) + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.unack_count(), 0) + self.assertEqual(q.ready_count(), 1) + self.assertEqual(val1, q.get()) + + def test_ack_unack_ack_failed(self): + q = SQLiteAckQueue(path=self.path) + q.put("val1") + q.put("val2") + q.put("val3") + val1 = q.get() + val2 = q.get() + val3 = q.get() + # qsize should be zero when all item is getted from q + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.unack_count(), 3) + # nack will let the item requeued as ready status + q.nack(val1) + self.assertEqual(q.qsize(), 1) + self.assertEqual(q.ready_count(), 1) + # ack failed is just mark item as ack failed + q.ack_failed(val3) + self.assertEqual(q.ack_failed_count(), 1) + # ack should not effect qsize + q.ack(val2) + self.assertEqual(q.acked_count(), 1) + self.assertEqual(q.qsize(), 1) + # all ack* related action will reduce unack count + self.assertEqual(q.unack_count(), 0) + # reget the nacked item + ready_val = q.get() + self.assertEqual(ready_val, val1) + q.ack(ready_val) + self.assertEqual(q.qsize(), 0) + self.assertEqual(q.acked_count(), 2) + self.assertEqual(q.ready_count(), 0) + + def test_put_0(self): + q = SQLiteAckQueue(path=self.path) + q.put(0) + d = q.get(block=False) + self.assertIsNotNone(d) + + +class SQLite3QueueInMemory(SQLite3AckQueueTest): + def setUp(self): + self.path = ":memory:" + self.auto_commit = True + + def test_open_close_1000(self): + self.skipTest('Memory based sqlite is not persistent.') + + def test_open_close_single(self): + self.skipTest('Memory based sqlite is not persistent.') + + def test_multiple_consumers(self): + self.skipTest('Skipped due to occasional crash during ' + 'multithreading mode.') + + def test_multi_threaded_multi_producer(self): + self.skipTest('Skipped due to occasional crash during ' + 'multithreading mode.') + + def test_multi_threaded_parallel(self): + self.skipTest('Skipped due to occasional crash during ' + 'multithreading mode.') + + def test_task_done_with_restart(self): + self.skipTest('Skipped due to not persistent.') + + def test_protocol_2(self): + self.skipTest('In memory queue is always new.') + + def test_resume_unack(self): + self.skipTest('Memory based sqlite is not persistent.') + + +class FILOSQLite3AckQueueTest(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='filo_sqlackqueue') + self.auto_commit = True + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def test_open_close_1000(self): + """Write 1000 items, close, reopen checking if all items are there""" + + q = FILOSQLiteAckQueue(self.path, auto_commit=self.auto_commit) + for i in range(1000): + q.put('var%d' % i) + self.assertEqual(1000, q.qsize()) + del q + q = FILOSQLiteAckQueue(self.path) + self.assertEqual(1000, q.qsize()) + for i in range(1000): + data = q.get() + self.assertEqual('var%d' % (999 - i), data) + # assert adding another one still works + q.put('foobar') + data = q.get() + self.assertEqual('foobar', data) + + +class SQLite3UniqueAckQueueTest(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='sqlackqueue') + self.auto_commit = True + + def test_add_duplicate_item(self): + q = UniqueAckQ(self.path) + q.put(1111) + self.assertEqual(1, q.size) + # put duplicate item + q.put(1111) + self.assertEqual(1, q.size) + + q.put(2222) + self.assertEqual(2, q.size) + + del q + q = UniqueAckQ(self.path) + self.assertEqual(2, q.size) diff -Nru python-persist-queue-0.4.0/persistqueue/tests/test_sqlqueue.py python-persist-queue-0.5.1/persistqueue/tests/test_sqlqueue.py --- python-persist-queue-0.4.0/persistqueue/tests/test_sqlqueue.py 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/persistqueue/tests/test_sqlqueue.py 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,424 @@ +# coding=utf-8 + +import random +import shutil +import sys +import tempfile +import unittest +from threading import Thread + +from persistqueue import SQLiteQueue, FILOSQLiteQueue, UniqueQ +from persistqueue import Empty +from persistqueue.serializers import json as serializers_json +from persistqueue.serializers import pickle as serializers_pickle +from persistqueue.serializers import msgpack as serializers_msgpack + + +class SQLite3QueueTest(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='sqlqueue') + self.auto_commit = True + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def test_raise_empty(self): + q = SQLiteQueue(self.path, auto_commit=self.auto_commit) + + q.put('first') + d = q.get() + self.assertEqual('first', d) + self.assertRaises(Empty, q.get, block=False) + + # assert with timeout + self.assertRaises(Empty, q.get, block=True, timeout=1.0) + # assert with negative timeout + self.assertRaises(ValueError, q.get, block=True, timeout=-1.0) + del q + + def test_empty(self): + q = SQLiteQueue(self.path, auto_commit=self.auto_commit) + self.assertEqual(q.empty(), True) + + q.put('first') + self.assertEqual(q.empty(), False) + + q.get() + self.assertEqual(q.empty(), True) + + def test_open_close_single(self): + """Write 1 item, close, reopen checking if same item is there""" + + q = SQLiteQueue(self.path, auto_commit=self.auto_commit) + q.put(b'var1') + del q + q = SQLiteQueue(self.path) + self.assertEqual(1, q.qsize()) + self.assertEqual(b'var1', q.get()) + + def test_open_close_1000(self): + """Write 1000 items, close, reopen checking if all items are there""" + + q = SQLiteQueue(self.path, auto_commit=self.auto_commit) + for i in range(1000): + q.put('var%d' % i) + + self.assertEqual(1000, q.qsize()) + del q + q = SQLiteQueue(self.path) + self.assertEqual(1000, q.qsize()) + for i in range(1000): + data = q.get() + self.assertEqual('var%d' % i, data) + # assert adding another one still works + q.put('foobar') + data = q.get() + self.assertEqual('foobar', data) + + def test_random_read_write(self): + """Test random read/write""" + + q = SQLiteQueue(self.path, auto_commit=self.auto_commit) + n = 0 + for _ in range(1000): + if random.random() < 0.5: + if n > 0: + q.get() + n -= 1 + else: + self.assertRaises(Empty, q.get, block=False) + else: + q.put('var%d' % random.getrandbits(16)) + n += 1 + + def test_multi_threaded_parallel(self): + """Create consumer and producer threads, check parallelism""" + + # self.skipTest("Not supported multi-thread.") + + m_queue = SQLiteQueue(path=self.path, multithreading=True, + auto_commit=self.auto_commit) + + def producer(): + for i in range(1000): + m_queue.put('var%d' % i) + + def consumer(): + for i in range(1000): + x = m_queue.get(block=True) + self.assertEqual('var%d' % i, x) + + c = Thread(target=consumer) + c.start() + p = Thread(target=producer) + p.start() + p.join() + c.join() + self.assertEqual(0, m_queue.size) + self.assertEqual(0, len(m_queue)) + self.assertRaises(Empty, m_queue.get, block=False) + + def test_multi_threaded_multi_producer(self): + """Test sqlqueue can be used by multiple producers.""" + queue = SQLiteQueue(path=self.path, multithreading=True, + auto_commit=self.auto_commit) + + def producer(seq): + for i in range(10): + queue.put('var%d' % (i + (seq * 10))) + + def consumer(): + for _ in range(100): + data = queue.get(block=True) + self.assertTrue('var' in data) + + c = Thread(target=consumer) + c.start() + producers = [] + for seq in range(10): + t = Thread(target=producer, args=(seq,)) + t.start() + producers.append(t) + + for t in producers: + t.join() + + c.join() + + def test_multiple_consumers(self): + """Test sqlqueue can be used by multiple consumers.""" + + queue = SQLiteQueue(path=self.path, multithreading=True, + auto_commit=self.auto_commit) + + def producer(): + for x in range(1000): + queue.put('var%d' % x) + + counter = [] + # Set all to 0 + for _ in range(1000): + counter.append(0) + + def consumer(index): + for i in range(200): + data = queue.get(block=True) + self.assertTrue('var' in data) + counter[index * 200 + i] = data + + p = Thread(target=producer) + p.start() + consumers = [] + for index in range(5): + t = Thread(target=consumer, args=(index,)) + t.start() + consumers.append(t) + + p.join() + for t in consumers: + t.join() + + self.assertEqual(0, queue.qsize()) + for x in range(1000): + self.assertNotEqual(0, counter[x], + "not 0 for counter's index %s" % x) + + self.assertEqual(len(set(counter)), len(counter)) + + def test_task_done_with_restart(self): + """Test that items are not deleted before task_done.""" + + q = SQLiteQueue(path=self.path, auto_commit=False) + + for i in range(1, 11): + q.put(i) + + self.assertEqual(1, q.get()) + self.assertEqual(2, q.get()) + # size is correct before task_done + self.assertEqual(8, q.qsize()) + q.task_done() + # make sure the size still correct + self.assertEqual(8, q.qsize()) + + self.assertEqual(3, q.get()) + # without task done + del q + q = SQLiteQueue(path=self.path, auto_commit=False) + # After restart, the qsize and head item are the same + self.assertEqual(8, q.qsize()) + # After restart, the queue still works + self.assertEqual(3, q.get()) + self.assertEqual(7, q.qsize()) + + def test_protocol_1(self): + shutil.rmtree(self.path, ignore_errors=True) + q = SQLiteQueue(path=self.path) + self.assertEqual(q._serializer.protocol, + 2 if sys.version_info[0] == 2 else 4) + + def test_protocol_2(self): + q = SQLiteQueue(path=self.path) + self.assertEqual(q._serializer.protocol, + 2 if sys.version_info[0] == 2 else 4) + + def test_json_serializer(self): + q = SQLiteQueue( + path=self.path, + serializer=serializers_json) + x = dict( + a=1, + b=2, + c=dict( + d=list(range(5)), + e=[1] + )) + q.put(x) + self.assertEquals(q.get(), x) + + def test_put_0(self): + q = SQLiteQueue(path=self.path) + q.put(0) + d = q.get(block=False) + self.assertIsNotNone(d) + + +class SQLite3QueueNoAutoCommitTest(SQLite3QueueTest): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='sqlqueue_auto_commit') + self.auto_commit = False + + def test_multiple_consumers(self): + """ + FAIL: test_multiple_consumers ( + -tests.test_sqlqueue.SQLite3QueueNoAutoCommitTest) + Test sqlqueue can be used by multiple consumers. + ---------------------------------------------------------------------- + Traceback (most recent call last): + File "persist-queue\tests\test_sqlqueue.py", line 183, + -in test_multiple_consumers + self.assertEqual(0, queue.qsize()) + AssertionError: 0 != 72 + :return: + """ + self.skipTest('Skipped due to a known bug above.') + + +class SQLite3QueueInMemory(SQLite3QueueTest): + def setUp(self): + self.path = ":memory:" + self.auto_commit = True + + def test_open_close_1000(self): + self.skipTest('Memory based sqlite is not persistent.') + + def test_open_close_single(self): + self.skipTest('Memory based sqlite is not persistent.') + + def test_multiple_consumers(self): + self.skipTest('Skipped due to occasional crash during ' + 'multithreading mode.') + + def test_multi_threaded_multi_producer(self): + self.skipTest('Skipped due to occasional crash during ' + 'multithreading mode.') + + def test_multi_threaded_parallel(self): + self.skipTest('Skipped due to occasional crash during ' + 'multithreading mode.') + + def test_task_done_with_restart(self): + self.skipTest('Skipped due to not persistent.') + + def test_protocol_2(self): + self.skipTest('In memory queue is always new.') + + +class FILOSQLite3QueueTest(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='filo_sqlqueue') + self.auto_commit = True + + def tearDown(self): + shutil.rmtree(self.path, ignore_errors=True) + + def test_open_close_1000(self): + """Write 1000 items, close, reopen checking if all items are there""" + + q = FILOSQLiteQueue(self.path, auto_commit=self.auto_commit) + for i in range(1000): + q.put('var%d' % i) + self.assertEqual(1000, q.qsize()) + del q + q = FILOSQLiteQueue(self.path) + self.assertEqual(1000, q.qsize()) + for i in range(1000): + data = q.get() + self.assertEqual('var%d' % (999 - i), data) + # assert adding another one still works + q.put('foobar') + data = q.get() + self.assertEqual('foobar', data) + + +class FILOSQLite3QueueNoAutoCommitTest(FILOSQLite3QueueTest): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='filo_sqlqueue_auto_commit') + self.auto_commit = False + + +class SQLite3UniqueQueueTest(unittest.TestCase): + def setUp(self): + self.path = tempfile.mkdtemp(suffix='sqlqueue') + self.auto_commit = True + + def test_add_duplicate_item(self): + q = UniqueQ(self.path) + q.put(1111) + self.assertEqual(1, q.size) + # put duplicate item + q.put(1111) + self.assertEqual(1, q.size) + + q.put(2222) + self.assertEqual(2, q.size) + + del q + q = UniqueQ(self.path) + self.assertEqual(2, q.size) + + def test_multiple_consumers(self): + """Test UniqueQ can be used by multiple consumers.""" + + queue = UniqueQ(path=self.path, multithreading=True, + auto_commit=self.auto_commit) + + def producer(): + for x in range(1000): + queue.put('var%d' % x) + + counter = [] + # Set all to 0 + for _ in range(1000): + counter.append(0) + + def consumer(index): + for i in range(200): + data = queue.get(block=True) + self.assertTrue('var' in data) + counter[index * 200 + i] = data + + p = Thread(target=producer) + p.start() + consumers = [] + for index in range(5): + t = Thread(target=consumer, args=(index,)) + t.start() + consumers.append(t) + + p.join() + for t in consumers: + t.join() + + self.assertEqual(0, queue.qsize()) + for x in range(1000): + self.assertNotEqual(0, counter[x], + "not 0 for counter's index %s" % x) + + self.assertEqual(len(set(counter)), len(counter)) + + def test_unique_dictionary_serialization_pickle(self): + queue = UniqueQ( + path=self.path, + multithreading=True, + auto_commit=self.auto_commit, + serializer=serializers_pickle, + ) + queue.put({"foo": 1, "bar": 2}) + self.assertEqual(queue.total, 1) + queue.put({"bar": 2, "foo": 1}) + self.assertEqual(queue.total, 1) + + def test_unique_dictionary_serialization_msgpack(self): + queue = UniqueQ( + path=self.path, + multithreading=True, + auto_commit=self.auto_commit, + serializer=serializers_msgpack + ) + queue.put({"foo": 1, "bar": 2}) + self.assertEqual(queue.total, 1) + queue.put({"bar": 2, "foo": 1}) + self.assertEqual(queue.total, 1) + + def test_unique_dictionary_serialization_json(self): + queue = UniqueQ( + path=self.path, + multithreading=True, + auto_commit=self.auto_commit, + serializer=serializers_json + ) + queue.put({"foo": 1, "bar": 2}) + self.assertEqual(queue.total, 1) + queue.put({"bar": 2, "foo": 1}) + self.assertEqual(queue.total, 1) diff -Nru python-persist-queue-0.4.0/persist_queue.egg-info/dependency_links.txt python-persist-queue-0.5.1/persist_queue.egg-info/dependency_links.txt --- python-persist-queue-0.4.0/persist_queue.egg-info/dependency_links.txt 2018-06-17 12:52:23.000000000 +0000 +++ python-persist-queue-0.5.1/persist_queue.egg-info/dependency_links.txt 1970-01-01 00:00:00.000000000 +0000 @@ -1 +0,0 @@ - diff -Nru python-persist-queue-0.4.0/persist_queue.egg-info/PKG-INFO python-persist-queue-0.5.1/persist_queue.egg-info/PKG-INFO --- python-persist-queue-0.4.0/persist_queue.egg-info/PKG-INFO 2018-06-17 12:52:23.000000000 +0000 +++ python-persist-queue-0.5.1/persist_queue.egg-info/PKG-INFO 1970-01-01 00:00:00.000000000 +0000 @@ -1,403 +0,0 @@ -Metadata-Version: 1.2 -Name: persist-queue -Version: 0.4.0 -Summary: A thread-safe disk based persistent queue in Python. -Home-page: http://github.com/peter-wangxu/persist-queue -Author: Peter Wang -Author-email: wangxu198709@gmail.com -Maintainer: Peter Wang -Maintainer-email: wangxu198709@gmail.com -License: BSD -Description: persist-queue - A thread-safe, disk-based queue for Python - ========================================================== - - .. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac - :target: https://circleci.com/gh/peter-wangxu/persist-queue - - .. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows - :target: https://ci.appveyor.com/project/peter-wangxu/persist-queue - - .. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg - :target: https://codecov.io/gh/peter-wangxu/persist-queue - - .. image:: https://img.shields.io/pypi/v/persist-queue.svg - :target: https://pypi.python.org/pypi/persist-queue - - ``persist-queue`` implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements: - - * Disk-based: each queued item should be stored in disk in case of any crash. - * Thread-safe: can be used by multi-threaded producers and multi-threaded consumers. - * Recoverable: Items can be read after process restart. - * Green-compatible: can be used in ``greenlet`` or ``eventlet`` environment. - - While *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current - implementation without huge code change. this is the motivation to start this project. - - *persist-queue* use *pickle* object serialization module to support object instances. - Most built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects, - please refer to `Pickling and unpickling extension types(Python2) `_ - and `Pickling Class Instances(Python3) `_ - - This project is based on the achievements of `python-pqueue `_ - and `queuelib `_ - - Requirements - ------------ - * Python 2.7 or Python 3.x. - * Full support for Linux. - * Windows support (with `Caution`_ if ``persistqueue.Queue`` is used). - - Installation - ------------ - - from pypi - ^^^^^^^^^ - - .. code-block:: console - - pip install persist-queue - - from source code - ^^^^^^^^^^^^^^^^ - - .. code-block:: console - - git clone https://github.com/peter-wangxu/persist-queue - cd persist-queue - python setup.py install - - - Benchmark - --------- - - Here are the results for writing/reading **1000** items to the disk comparing the sqlite3 and file queue. - - - Windows - - OS: Windows 10 - - Disk: SATA3 SSD - - RAM: 16 GiB - - +---------------+---------+-------------------------+----------------------------+ - | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | - +---------------+---------+-------------------------+----------------------------+ - | SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 | - +---------------+---------+-------------------------+----------------------------+ - | File Queue | 15.0550 | 15.9150 | 30.7650 | - +---------------+---------+-------------------------+----------------------------+ - - - Linux - - OS: Ubuntu 16.04 (VM) - - Disk: SATA3 SSD - - RAM: 4 GiB - - +---------------+--------+-------------------------+----------------------------+ - | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | - +---------------+--------+-------------------------+----------------------------+ - | SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 | - +---------------+--------+-------------------------+----------------------------+ - | File Queue | 0.9123 | 1.0411 | 2.5104 | - +---------------+--------+-------------------------+----------------------------+ - - - **note** Above result was got from: - - .. code-block:: console - - python benchmark/run_benchmark.py 1000 - - - To see the real performance on your host, run the script under ``benchmark/run_benchmark.py``: - - .. code-block:: console - - python benchmark/run_benchmark.py - - - Examples - -------- - - - Example usage with a SQLite3 based queue - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - >>> import persistqueue - >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True) - >>> q.put('str1') - >>> q.put('str2') - >>> q.put('str3') - >>> q.get() - 'str1' - >>> del q - - - Close the console, and then recreate the queue: - - .. code-block:: python - - >>> import persistqueue - >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True) - >>> q.get() - 'str2' - >>> - - - Example usage of SQLite3 based ``UniqueQ`` - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - This queue does not allow duplicate items. - - .. code-block:: python - - >>> import persistqueue - >>> q = persistqueue.UniqueQ('mypath') - >>> q.put('str1') - >>> q.put('str1') - >>> q.size - 1 - >>> q.put('str2') - >>> q.size - 2 - >>> - - Example usage of SQLite3 based ``SQLiteAckQueue`` - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - The core functions: - ``get``: get from queue and mark item as unack - ``ack``: mark item as acked - ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it - ``ack_failed``: there might be something wrong during process, so just mark item as failed. - - .. code-block:: python - - >>> import persisitqueue - >>> ackq = persistqueue.SQLiteAckQueue('path') - >>> ackq.put('str1') - >>> item = ackq.get() - >>> # Do something with the item - >>> ackq.ack(item) # If done with the item - >>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker - >>> ackq.ack_failed() # Or else mark item as `ack_failed` to discard this item - - - - Note: this queue does not support ``auto_commit=True`` - - Example usage with a file based queue - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - >>> from persistqueue import Queue - >>> q = Queue("mypath") - >>> q.put('a') - >>> q.put('b') - >>> q.put('c') - >>> q.get() - 'a' - >>> q.task_done() - - Close the python console, and then we restart the queue from the same path, - - .. code-block:: python - - >>> from persistqueue import Queue - >>> q = Queue('mypath') - >>> q.get() - 'b' - >>> q.task_done() - - - - Example usage with a SQLite3 based dict - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - >>> from persisitqueue import PDict - >>> q = PDict("testpath", "testname") - >>> q['key1'] = 123 - >>> q['key2'] = 321 - >>> q['key1'] - 123 - >>> len(q) - 2 - >>> del q['key1'] - >>> q['key1'] - Traceback (most recent call last): - File "", line 1, in - File "persistqueue\pdict.py", line 58, in __getitem__ - raise KeyError('Key: {} not exists.'.format(item)) - KeyError: 'Key: key1 not exists.' - - Close the console and restart the PDict - - - .. code-block:: python - - >>> from persisitqueue import PDict - >>> q = PDict("testpath", "testname") - >>> q['key2'] - 321 - - - Multi-thread usage for **SQLite3** based queue - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - from persistqueue import FIFOSQLiteQueue - - q = FIFOSQLiteQueue(path="./test", multithreading=True) - - def worker(): - while True: - item = q.get() - do_work(item) - - for i in range(num_worker_threads): - t = Thread(target=worker) - t.daemon = True - t.start() - - for item in source(): - q.put(item) - - - multi-thread usage for **Queue** - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - from persistqueue import Queue - - q = Queue() - - def worker(): - while True: - item = q.get() - do_work(item) - q.task_done() - - for i in range(num_worker_threads): - t = Thread(target=worker) - t.daemon = True - t.start() - - for item in source(): - q.put(item) - - q.join() # block until all tasks are done - - - Tips - ---- - - ``task_done`` is required both for filed based queue and SQLite3 based queue (when ``auto_commit=False``) - to persist the cursor of next ``get`` to the disk. - - - Performance impact - ------------------ - - - **WAL** - - Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature - ``WAL `` which can improve the performance - significantly, a general testing indicates that ``persistqueue`` is 2-4 times - faster than previous version. - - - **auto_commit=False** - - Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak - the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user - needs to perform ``queue.task_done()`` to persist the changes made to the disk since - last ``task_done`` invocation. - - - **pickle protocol selection** - - From v0.3.6, the ``persistqueue`` will select ``Protocol version 2`` for python2 and ``Protocol version 4`` for python3 - respectively. This selection only happens when the directory is not present when initializing the queue. - - Tests - ----- - - *persist-queue* use ``tox`` to trigger tests. - - - Unit test - - .. code-block:: console - - tox -e - - Available ````: ``py27``, ``py34``, ``py35``, ``py36``, ``py37`` - - - - PEP8 check - - .. code-block:: console - - tox -e pep8 - - - `pyenv `_ is usually a helpful tool to manage multiple versions of Python. - - Caution - ------- - - Currently, the atomic operation is not supported on Windows due to the limitation of Python's `os.rename `_, - That's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``. - - **DO NOT put any critical data on persistqueue.queue on Windows**. - - This issue is under track by issue `Atomic renames on windows `_ - - Contribution - ------------ - - Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to - enhance this project with you :). - - - License - ------- - - `BSD `_ - - Contributors - ------------ - - `Contributors `_ - - FAQ - --- - - * ``sqlite3.OperationalError: database is locked`` is raised. - - persistqueue open 2 connections for the db if ``multithreading=True``, the - SQLite database is locked until that transaction is committed. The ``timeout`` - parameter specifies how long the connection should wait for the lock to go away - until raising an exception. Default time is **10**, increase ``timeout`` - when creating the queue if above error occurs. - - * sqlite3 based queues are not thread-safe. - - The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please - make sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:). - - -Platform: all -Classifier: Development Status :: 4 - Beta -Classifier: Operating System :: OS Independent -Classifier: Intended Audience :: Developers -Classifier: License :: OSI Approved :: BSD License -Classifier: Programming Language :: Python -Classifier: Programming Language :: Python :: Implementation -Classifier: Programming Language :: Python :: 2 -Classifier: Programming Language :: Python :: 2.7 -Classifier: Programming Language :: Python :: 3 -Classifier: Programming Language :: Python :: 3.4 -Classifier: Programming Language :: Python :: 3.5 -Classifier: Programming Language :: Python :: 3.6 -Classifier: Programming Language :: Python :: 3.7 -Classifier: Topic :: Software Development :: Libraries diff -Nru python-persist-queue-0.4.0/persist_queue.egg-info/SOURCES.txt python-persist-queue-0.5.1/persist_queue.egg-info/SOURCES.txt --- python-persist-queue-0.4.0/persist_queue.egg-info/SOURCES.txt 2018-06-17 12:52:24.000000000 +0000 +++ python-persist-queue-0.5.1/persist_queue.egg-info/SOURCES.txt 1970-01-01 00:00:00.000000000 +0000 @@ -1,24 +0,0 @@ -LICENSE -MANIFEST.in -README.rst -requirements.txt -setup.cfg -setup.py -test-requirements.txt -persist_queue.egg-info/PKG-INFO -persist_queue.egg-info/SOURCES.txt -persist_queue.egg-info/dependency_links.txt -persist_queue.egg-info/top_level.txt -persistqueue/__init__.py -persistqueue/common.py -persistqueue/exceptions.py -persistqueue/pdict.py -persistqueue/queue.py -persistqueue/sqlackqueue.py -persistqueue/sqlbase.py -persistqueue/sqlqueue.py -tests/__init__.py -tests/test_pdict.py -tests/test_queue.py -tests/test_sqlackqueue.py -tests/test_sqlqueue.py \ No newline at end of file diff -Nru python-persist-queue-0.4.0/persist_queue.egg-info/top_level.txt python-persist-queue-0.5.1/persist_queue.egg-info/top_level.txt --- python-persist-queue-0.4.0/persist_queue.egg-info/top_level.txt 2018-06-17 12:52:23.000000000 +0000 +++ python-persist-queue-0.5.1/persist_queue.egg-info/top_level.txt 1970-01-01 00:00:00.000000000 +0000 @@ -1,2 +0,0 @@ -persistqueue -tests diff -Nru python-persist-queue-0.4.0/PKG-INFO python-persist-queue-0.5.1/PKG-INFO --- python-persist-queue-0.4.0/PKG-INFO 2018-06-17 12:52:24.000000000 +0000 +++ python-persist-queue-0.5.1/PKG-INFO 1970-01-01 00:00:00.000000000 +0000 @@ -1,403 +0,0 @@ -Metadata-Version: 1.2 -Name: persist-queue -Version: 0.4.0 -Summary: A thread-safe disk based persistent queue in Python. -Home-page: http://github.com/peter-wangxu/persist-queue -Author: Peter Wang -Author-email: wangxu198709@gmail.com -Maintainer: Peter Wang -Maintainer-email: wangxu198709@gmail.com -License: BSD -Description: persist-queue - A thread-safe, disk-based queue for Python - ========================================================== - - .. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac - :target: https://circleci.com/gh/peter-wangxu/persist-queue - - .. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows - :target: https://ci.appveyor.com/project/peter-wangxu/persist-queue - - .. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg - :target: https://codecov.io/gh/peter-wangxu/persist-queue - - .. image:: https://img.shields.io/pypi/v/persist-queue.svg - :target: https://pypi.python.org/pypi/persist-queue - - ``persist-queue`` implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements: - - * Disk-based: each queued item should be stored in disk in case of any crash. - * Thread-safe: can be used by multi-threaded producers and multi-threaded consumers. - * Recoverable: Items can be read after process restart. - * Green-compatible: can be used in ``greenlet`` or ``eventlet`` environment. - - While *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current - implementation without huge code change. this is the motivation to start this project. - - *persist-queue* use *pickle* object serialization module to support object instances. - Most built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects, - please refer to `Pickling and unpickling extension types(Python2) `_ - and `Pickling Class Instances(Python3) `_ - - This project is based on the achievements of `python-pqueue `_ - and `queuelib `_ - - Requirements - ------------ - * Python 2.7 or Python 3.x. - * Full support for Linux. - * Windows support (with `Caution`_ if ``persistqueue.Queue`` is used). - - Installation - ------------ - - from pypi - ^^^^^^^^^ - - .. code-block:: console - - pip install persist-queue - - from source code - ^^^^^^^^^^^^^^^^ - - .. code-block:: console - - git clone https://github.com/peter-wangxu/persist-queue - cd persist-queue - python setup.py install - - - Benchmark - --------- - - Here are the results for writing/reading **1000** items to the disk comparing the sqlite3 and file queue. - - - Windows - - OS: Windows 10 - - Disk: SATA3 SSD - - RAM: 16 GiB - - +---------------+---------+-------------------------+----------------------------+ - | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | - +---------------+---------+-------------------------+----------------------------+ - | SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 | - +---------------+---------+-------------------------+----------------------------+ - | File Queue | 15.0550 | 15.9150 | 30.7650 | - +---------------+---------+-------------------------+----------------------------+ - - - Linux - - OS: Ubuntu 16.04 (VM) - - Disk: SATA3 SSD - - RAM: 4 GiB - - +---------------+--------+-------------------------+----------------------------+ - | | Write | Write/Read(1 task_done) | Write/Read(many task_done) | - +---------------+--------+-------------------------+----------------------------+ - | SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 | - +---------------+--------+-------------------------+----------------------------+ - | File Queue | 0.9123 | 1.0411 | 2.5104 | - +---------------+--------+-------------------------+----------------------------+ - - - **note** Above result was got from: - - .. code-block:: console - - python benchmark/run_benchmark.py 1000 - - - To see the real performance on your host, run the script under ``benchmark/run_benchmark.py``: - - .. code-block:: console - - python benchmark/run_benchmark.py - - - Examples - -------- - - - Example usage with a SQLite3 based queue - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - >>> import persistqueue - >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True) - >>> q.put('str1') - >>> q.put('str2') - >>> q.put('str3') - >>> q.get() - 'str1' - >>> del q - - - Close the console, and then recreate the queue: - - .. code-block:: python - - >>> import persistqueue - >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True) - >>> q.get() - 'str2' - >>> - - - Example usage of SQLite3 based ``UniqueQ`` - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - This queue does not allow duplicate items. - - .. code-block:: python - - >>> import persistqueue - >>> q = persistqueue.UniqueQ('mypath') - >>> q.put('str1') - >>> q.put('str1') - >>> q.size - 1 - >>> q.put('str2') - >>> q.size - 2 - >>> - - Example usage of SQLite3 based ``SQLiteAckQueue`` - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - The core functions: - ``get``: get from queue and mark item as unack - ``ack``: mark item as acked - ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it - ``ack_failed``: there might be something wrong during process, so just mark item as failed. - - .. code-block:: python - - >>> import persisitqueue - >>> ackq = persistqueue.SQLiteAckQueue('path') - >>> ackq.put('str1') - >>> item = ackq.get() - >>> # Do something with the item - >>> ackq.ack(item) # If done with the item - >>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker - >>> ackq.ack_failed() # Or else mark item as `ack_failed` to discard this item - - - - Note: this queue does not support ``auto_commit=True`` - - Example usage with a file based queue - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - >>> from persistqueue import Queue - >>> q = Queue("mypath") - >>> q.put('a') - >>> q.put('b') - >>> q.put('c') - >>> q.get() - 'a' - >>> q.task_done() - - Close the python console, and then we restart the queue from the same path, - - .. code-block:: python - - >>> from persistqueue import Queue - >>> q = Queue('mypath') - >>> q.get() - 'b' - >>> q.task_done() - - - - Example usage with a SQLite3 based dict - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - >>> from persisitqueue import PDict - >>> q = PDict("testpath", "testname") - >>> q['key1'] = 123 - >>> q['key2'] = 321 - >>> q['key1'] - 123 - >>> len(q) - 2 - >>> del q['key1'] - >>> q['key1'] - Traceback (most recent call last): - File "", line 1, in - File "persistqueue\pdict.py", line 58, in __getitem__ - raise KeyError('Key: {} not exists.'.format(item)) - KeyError: 'Key: key1 not exists.' - - Close the console and restart the PDict - - - .. code-block:: python - - >>> from persisitqueue import PDict - >>> q = PDict("testpath", "testname") - >>> q['key2'] - 321 - - - Multi-thread usage for **SQLite3** based queue - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - from persistqueue import FIFOSQLiteQueue - - q = FIFOSQLiteQueue(path="./test", multithreading=True) - - def worker(): - while True: - item = q.get() - do_work(item) - - for i in range(num_worker_threads): - t = Thread(target=worker) - t.daemon = True - t.start() - - for item in source(): - q.put(item) - - - multi-thread usage for **Queue** - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - - .. code-block:: python - - from persistqueue import Queue - - q = Queue() - - def worker(): - while True: - item = q.get() - do_work(item) - q.task_done() - - for i in range(num_worker_threads): - t = Thread(target=worker) - t.daemon = True - t.start() - - for item in source(): - q.put(item) - - q.join() # block until all tasks are done - - - Tips - ---- - - ``task_done`` is required both for filed based queue and SQLite3 based queue (when ``auto_commit=False``) - to persist the cursor of next ``get`` to the disk. - - - Performance impact - ------------------ - - - **WAL** - - Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature - ``WAL `` which can improve the performance - significantly, a general testing indicates that ``persistqueue`` is 2-4 times - faster than previous version. - - - **auto_commit=False** - - Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak - the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user - needs to perform ``queue.task_done()`` to persist the changes made to the disk since - last ``task_done`` invocation. - - - **pickle protocol selection** - - From v0.3.6, the ``persistqueue`` will select ``Protocol version 2`` for python2 and ``Protocol version 4`` for python3 - respectively. This selection only happens when the directory is not present when initializing the queue. - - Tests - ----- - - *persist-queue* use ``tox`` to trigger tests. - - - Unit test - - .. code-block:: console - - tox -e - - Available ````: ``py27``, ``py34``, ``py35``, ``py36``, ``py37`` - - - - PEP8 check - - .. code-block:: console - - tox -e pep8 - - - `pyenv `_ is usually a helpful tool to manage multiple versions of Python. - - Caution - ------- - - Currently, the atomic operation is not supported on Windows due to the limitation of Python's `os.rename `_, - That's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``. - - **DO NOT put any critical data on persistqueue.queue on Windows**. - - This issue is under track by issue `Atomic renames on windows `_ - - Contribution - ------------ - - Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to - enhance this project with you :). - - - License - ------- - - `BSD `_ - - Contributors - ------------ - - `Contributors `_ - - FAQ - --- - - * ``sqlite3.OperationalError: database is locked`` is raised. - - persistqueue open 2 connections for the db if ``multithreading=True``, the - SQLite database is locked until that transaction is committed. The ``timeout`` - parameter specifies how long the connection should wait for the lock to go away - until raising an exception. Default time is **10**, increase ``timeout`` - when creating the queue if above error occurs. - - * sqlite3 based queues are not thread-safe. - - The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please - make sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:). - - -Platform: all -Classifier: Development Status :: 4 - Beta -Classifier: Operating System :: OS Independent -Classifier: Intended Audience :: Developers -Classifier: License :: OSI Approved :: BSD License -Classifier: Programming Language :: Python -Classifier: Programming Language :: Python :: Implementation -Classifier: Programming Language :: Python :: 2 -Classifier: Programming Language :: Python :: 2.7 -Classifier: Programming Language :: Python :: 3 -Classifier: Programming Language :: Python :: 3.4 -Classifier: Programming Language :: Python :: 3.5 -Classifier: Programming Language :: Python :: 3.6 -Classifier: Programming Language :: Python :: 3.7 -Classifier: Topic :: Software Development :: Libraries diff -Nru python-persist-queue-0.4.0/README.rst python-persist-queue-0.5.1/README.rst --- python-persist-queue-0.4.0/README.rst 2018-06-17 12:36:30.000000000 +0000 +++ python-persist-queue-0.5.1/README.rst 2020-11-09 05:35:13.000000000 +0000 @@ -23,7 +23,7 @@ While *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current implementation without huge code change. this is the motivation to start this project. -*persist-queue* use *pickle* object serialization module to support object instances. +By default, *persist-queue* use *pickle* object serialization module to support object instances. Most built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects, please refer to `Pickling and unpickling extension types(Python2) `_ and `Pickling Class Instances(Python3) `_ @@ -31,12 +31,30 @@ This project is based on the achievements of `python-pqueue `_ and `queuelib `_ +Slack channels +^^^^^^^^^^^^^^ + +Join `persist-queue `_ channel + + Requirements ------------ * Python 2.7 or Python 3.x. * Full support for Linux. * Windows support (with `Caution`_ if ``persistqueue.Queue`` is used). +Features +-------- + +- Multiple platforms support: Linux, macOS, Windows +- Pure python +- Both filed based queues and sqlite3 based queues are supported +- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, json + + + Installation ------------ @@ -46,6 +64,9 @@ .. code-block:: console pip install persist-queue + # for msgpack support, use following command + pip install persist-queue[extra] + from source code ^^^^^^^^^^^^^^^^ @@ -54,13 +75,15 @@ git clone https://github.com/peter-wangxu/persist-queue cd persist-queue + # for msgpack support, run 'pip install -r extra-requirements.txt' first python setup.py install Benchmark --------- -Here are the results for writing/reading **1000** items to the disk comparing the sqlite3 and file queue. +Here are the time spent(in seconds) for writing/reading **1000** items to the +disk comparing the sqlite3 and file queue. - Windows - OS: Windows 10 @@ -72,9 +95,13 @@ +---------------+---------+-------------------------+----------------------------+ | SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 | +---------------+---------+-------------------------+----------------------------+ -| File Queue | 15.0550 | 15.9150 | 30.7650 | +| File Queue | 4.9520 | 5.0560 | 8.4900 | +---------------+---------+-------------------------+----------------------------+ +**windows note** +Performance of Windows File Queue has dramatic improvement since `v0.4.1` due to the +atomic renaming support(3-4X faster) + - Linux - OS: Ubuntu 16.04 (VM) - Disk: SATA3 SSD @@ -88,8 +115,24 @@ | File Queue | 0.9123 | 1.0411 | 2.5104 | +---------------+--------+-------------------------+----------------------------+ +- Mac OS + - OS: 10.14 (macOS Mojave) + - Disk: PCIe SSD + - RAM: 16 GiB -**note** Above result was got from: ++---------------+--------+-------------------------+----------------------------+ +| | Write | Write/Read(1 task_done) | Write/Read(many task_done) | ++---------------+--------+-------------------------+----------------------------+ +| SQLite3 Queue | 0.1879 | 0.2115 | 0.3147 | ++---------------+--------+-------------------------+----------------------------+ +| File Queue | 0.5158 | 0.5357 | 1.0446 | ++---------------+--------+-------------------------+----------------------------+ + +**note** + +- The value above is in seconds for reading/writing *1000* items, the less + the better +- Above result was got from: .. code-block:: console @@ -150,28 +193,35 @@ 2 >>> -Example usage of SQLite3 based ``SQLiteAckQueue`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Example usage of SQLite3 based ``SQLiteAckQueue``/``UniqueAckQ`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The core functions: -``get``: get from queue and mark item as unack -``ack``: mark item as acked -``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it -``ack_failed``: there might be something wrong during process, so just mark item as failed. + +- ``get``: get from queue and mark item as unack +- ``ack``: mark item as acked +- ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it +- ``ack_failed``: there might be something wrong during process, so just mark item as failed. +- ``clear_acked_data``: perform a sql delete agaist sqlite, it remove the latest 1000 items whose status is ``AckStatus.acked`` (note: this does not shrink the file size on disk) +- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``clear_acked_data`` .. code-block:: python - >>> import persisitqueue + >>> import persistqueue >>> ackq = persistqueue.SQLiteAckQueue('path') >>> ackq.put('str1') >>> item = ackq.get() >>> # Do something with the item >>> ackq.ack(item) # If done with the item >>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker - >>> ackq.ack_failed() # Or else mark item as `ack_failed` to discard this item + >>> ackq.ack_failed(item) # Or else mark item as `ack_failed` to discard this item -Note: this queue does not support ``auto_commit=True`` +Note: + +1. The SQLiteAckQueue always uses "auto_commit=True". +2. The Queue could be set in non-block style, e.g. "SQLiteAckQueue.get(block=False, timeout=5)". +3. ``UniqueAckQ`` only allows for unique items Example usage with a file based queue ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -197,6 +247,65 @@ 'b' >>> q.task_done() +Example usage with an auto-saving file based queue +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*Available since: v0.5.0* + +By default, items added to the queue are persisted during the ``put()`` call, +and items removed from a queue are only persisted when ``task_done()`` is +called. + +.. code-block:: python + + >>> from persistqueue import Queue + >>> q = Queue("mypath") + >>> q.put('a') + >>> q.put('b') + >>> q.get() + 'a' + >>> q.get() + 'b' + +After exiting and restarting the queue from the same path, we see the items +remain in the queue, because ``task_done()`` wasn't called before. + +.. code-block:: python + + >>> from persistqueue import Queue + >>> q = Queue('mypath') + >>> q.get() + 'a' + >>> q.get() + 'b' + +This can be advantageous. For example, if your program crashes before finishing +processing an item, it will remain in the queue after restarting. You can also +spread out the ``task_done()`` calls for performance reasons to avoid lots of +individual writes. + +Using ``autosave=True`` on a file based queue will automatically save on every +call to ``get()``. Calling ``task_done()`` is not necessary, but may still be +used to ``join()`` against the queue. + +.. code-block:: python + + >>> from persistqueue import Queue + >>> q = Queue("mypath", autosave=True) + >>> q.put('a') + >>> q.put('b') + >>> q.get() + 'a' + +After exiting and restarting the queue from the same path, only the second item +remains: + +.. code-block:: python + + >>> from persistqueue import Queue + >>> q = Queue('mypath', autosave=True) + >>> q.get() + 'b' Example usage with a SQLite3 based dict @@ -280,10 +389,45 @@ q.join() # block until all tasks are done +**note** + +Due to the limitation of file queue described in issue `#89 `_, +`task_done` in one thread may acknowledge items in other threads which should not be. Considering the `SQLiteAckQueue` if you have such requirement. + + +Serialization via msgpack/json +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +- v0.4.1: Currently only available for file based Queue +- v0.4.2: Also available for SQLite3 based Queues + +.. code-block:: python + + >>> from persistqueue + >>> q = persistqueue.Queue('mypath', persistqueue.serializers.msgpack) + >>> # via json + >>> # q = Queue('mypath', persistqueue.serializers.json) + >>> q.get() + 'b' + >>> q.task_done() + +Explicit resource reclaim +^^^^^^^^^^^^^^^^^^^^^^^^^ + +For some reasons, an application may require explicit reclamation for file +handles or sql connections before end of execution. In these cases, user can +simply call: +.. code-block:: python + + q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True) + del q + + +to reclaim related file handles or sql connections. + Tips ---- -``task_done`` is required both for filed based queue and SQLite3 based queue (when ``auto_commit=False``) +``task_done`` is required both for file based queue and SQLite3 based queue (when ``auto_commit=False``) to persist the cursor of next ``get`` to the disk. @@ -293,7 +437,7 @@ - **WAL** Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature - ``WAL `` which can improve the performance + `WAL `_ which can improve the performance significantly, a general testing indicates that ``persistqueue`` is 2-4 times faster than previous version. @@ -335,12 +479,11 @@ Caution ------- -Currently, the atomic operation is not supported on Windows due to the limitation of Python's `os.rename `_, +Currently, the atomic operation is supported on Windows while still in experimental, That's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``. **DO NOT put any critical data on persistqueue.queue on Windows**. -This issue is under track by issue `Atomic renames on windows `_ Contribution ------------ diff -Nru python-persist-queue-0.4.0/scripts/publish.sh python-persist-queue-0.5.1/scripts/publish.sh --- python-persist-queue-0.4.0/scripts/publish.sh 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/scripts/publish.sh 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -e +BASE_DIR=`pwd` +NAME=$(basename $BASE_DIR) +if [[ "$NAME" != "persist-queue" ]];then + echo "must run this in project root" + exit 1 +fi +python setup.py build sdist +python setup.py build bdist_wheel +twine check ${BASE_DIR}/dist/*.tar.gz +twine check ${BASE_DIR}/dist/*.whl +twine upload ${BASE_DIR}/dist/* diff -Nru python-persist-queue-0.4.0/setup.cfg python-persist-queue-0.5.1/setup.cfg --- python-persist-queue-0.4.0/setup.cfg 2018-06-17 12:52:24.000000000 +0000 +++ python-persist-queue-0.5.1/setup.cfg 2020-11-09 05:35:13.000000000 +0000 @@ -1,7 +1,2 @@ [bdist_wheel] universal = 1 - -[egg_info] -tag_build = -tag_date = 0 - diff -Nru python-persist-queue-0.4.0/setup.py python-persist-queue-0.5.1/setup.py --- python-persist-queue-0.4.0/setup.py 2018-05-19 06:56:15.000000000 +0000 +++ python-persist-queue-0.5.1/setup.py 2020-11-09 05:35:13.000000000 +0000 @@ -3,6 +3,13 @@ from setuptools import setup, find_packages + +def get_extras(): + return { + "extra": open("extra-requirements.txt").read().splitlines() + } + + setup( name='persist-queue', version=__import__('persistqueue').__version__, @@ -16,6 +23,7 @@ maintainer_email='wangxu198709@gmail.com', license=__import__('persistqueue').__license__, packages=find_packages(), + extras_require=get_extras(), platforms=["all"], url='http://github.com/peter-wangxu/persist-queue', classifiers=[ diff -Nru python-persist-queue-0.4.0/test-requirements.txt python-persist-queue-0.5.1/test-requirements.txt --- python-persist-queue-0.4.0/test-requirements.txt 2018-05-19 06:56:15.000000000 +0000 +++ python-persist-queue-0.5.1/test-requirements.txt 2020-11-09 05:35:13.000000000 +0000 @@ -1,8 +1,8 @@ mock>=2.0.0 flake8>=3.2.1 eventlet>=0.19.0 +msgpack>=0.5.6 nose2>=0.6.5 coverage!=4.5 cov_core>=1.15.0 -virtualenv>=15.1.0 - +virtualenv>=15.1.0 \ No newline at end of file diff -Nru python-persist-queue-0.4.0/tests/test_pdict.py python-persist-queue-0.5.1/tests/test_pdict.py --- python-persist-queue-0.4.0/tests/test_pdict.py 2018-05-19 06:56:15.000000000 +0000 +++ python-persist-queue-0.5.1/tests/test_pdict.py 1970-01-01 00:00:00.000000000 +0000 @@ -1,71 +0,0 @@ - -import shutil -import tempfile -import unittest - -from persistqueue import pdict - - -class PDictTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='pdict') - - def tearDown(self): - shutil.rmtree(self.path, ignore_errors=True) - - def test_unsupported(self): - pd = pdict.PDict(self.path, 'pd') - pd['key_a'] = 'value_a' - self.assertRaises(NotImplementedError, pd.keys) - self.assertRaises(NotImplementedError, pd.iterkeys) - self.assertRaises(NotImplementedError, pd.values) - self.assertRaises(NotImplementedError, pd.itervalues) - self.assertRaises(NotImplementedError, pd.items) - self.assertRaises(NotImplementedError, pd.iteritems) - - def _for(): - for _ in pd: - pass - self.assertRaises(NotImplementedError, _for) - - def test_add(self): - pd = pdict.PDict(self.path, 'pd') - pd['key_a'] = 'value_a' - self.assertEqual(pd['key_a'], 'value_a') - self.assertTrue('key_a' in pd) - self.assertFalse('key_b' in pd) - self.assertRaises(KeyError, lambda: pd['key_b']) - pd['key_b'] = 'value_b' - self.assertEqual(pd['key_a'], 'value_a') - self.assertEqual(pd['key_b'], 'value_b') - - def test_set(self): - pd = pdict.PDict(self.path, 'pd') - pd['key_a'] = 'value_a' - pd['key_b'] = 'value_b' - self.assertEqual(pd['key_a'], 'value_a') - self.assertEqual(pd['key_b'], 'value_b') - pd['key_a'] = 'value_aaaaaa' - self.assertEqual(pd['key_a'], 'value_aaaaaa') - self.assertEqual(pd['key_b'], 'value_b') - - def test_delete(self): - pd = pdict.PDict(self.path, 'pd') - pd['key_a'] = 'value_a' - pd['key_b'] = 'value_b' - self.assertEqual(pd['key_a'], 'value_a') - self.assertEqual(pd['key_b'], 'value_b') - del pd['key_a'] - self.assertFalse('key_a' in pd) - self.assertRaises(KeyError, lambda: pd['key_a']) - self.assertEqual(pd['key_b'], 'value_b') - - def test_two_dicts(self): - pd_1 = pdict.PDict(self.path, '1') - pd_2 = pdict.PDict(self.path, '2') - pd_1['key_a'] = 'value_a' - pd_2['key_b'] = 'value_b' - self.assertEqual(pd_1['key_a'], 'value_a') - self.assertEqual(pd_2['key_b'], 'value_b') - self.assertRaises(KeyError, lambda: pd_1['key_b']) - self.assertRaises(KeyError, lambda: pd_2['key_a']) diff -Nru python-persist-queue-0.4.0/tests/test_queue.py python-persist-queue-0.5.1/tests/test_queue.py --- python-persist-queue-0.4.0/tests/test_queue.py 2018-05-19 06:56:15.000000000 +0000 +++ python-persist-queue-0.5.1/tests/test_queue.py 1970-01-01 00:00:00.000000000 +0000 @@ -1,262 +0,0 @@ -# coding=utf-8 - -import mock -import os -import pickle -import random -import shutil -import sys -import tempfile -import unittest -from threading import Thread - -from persistqueue import Queue, Empty, Full - - -class PersistTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='queue') - - def tearDown(self): - shutil.rmtree(self.path, ignore_errors=True) - - def test_open_close_single(self): - """Write 1 item, close, reopen checking if same item is there""" - - q = Queue(self.path) - q.put(b'var1') - del q - q = Queue(self.path) - self.assertEqual(1, q.qsize()) - self.assertEqual(b'var1', q.get()) - q.task_done() - - def test_open_close_1000(self): - """Write 1000 items, close, reopen checking if all items are there""" - - q = Queue(self.path) - for i in range(1000): - q.put('var%d' % i) - self.assertEqual(1000, q.qsize()) - del q - q = Queue(self.path) - self.assertEqual(1000, q.qsize()) - for i in range(1000): - data = q.get() - self.assertEqual('var%d' % i, data) - q.task_done() - with self.assertRaises(Empty): - q.get_nowait() - # assert adding another one still works - q.put(b'foobar') - data = q.get() - - def test_partial_write(self): - """Test recovery from previous crash w/ partial write""" - - q = Queue(self.path) - for i in range(100): - q.put('var%d' % i) - del q - with open(os.path.join(self.path, 'q00000'), 'ab') as f: - pickle.dump('文字化け', f) - q = Queue(self.path) - self.assertEqual(100, q.qsize()) - for i in range(100): - self.assertEqual('var%d' % i, q.get()) - q.task_done() - with self.assertRaises(Empty): - q.get_nowait() - - def test_random_read_write(self): - """Test random read/write""" - - q = Queue(self.path) - n = 0 - for i in range(1000): - if random.random() < 0.5: - if n > 0: - q.get_nowait() - q.task_done() - n -= 1 - else: - with self.assertRaises(Empty): - q.get_nowait() - else: - q.put('var%d' % random.getrandbits(16)) - n += 1 - - def test_multi_threaded(self): - """Create consumer and producer threads, check parallelism""" - - q = Queue(self.path) - - def producer(): - for i in range(1000): - q.put('var%d' % i) - - def consumer(): - for i in range(1000): - q.get() - q.task_done() - - c = Thread(target=consumer) - c.start() - p = Thread(target=producer) - p.start() - c.join() - p.join() - - q.join() - with self.assertRaises(Empty): - q.get_nowait() - - def test_garbage_on_head(self): - """Adds garbage to the queue head and let the internal integrity - checks fix it""" - - q = Queue(self.path) - q.put(b'var1') - del q - - with open(os.path.join(self.path, 'q00000'), 'ab') as f: - f.write(b'garbage') - q = Queue(self.path) - q.put(b'var2') - - self.assertEqual(2, q.qsize()) - self.assertEqual(b'var1', q.get()) - q.task_done() - - def test_task_done_too_many_times(self): - """Test too many task_done called.""" - q = Queue(self.path) - q.put(b'var1') - q.get() - q.task_done() - - with self.assertRaises(ValueError): - q.task_done() - - def test_get_timeout_negative(self): - q = Queue(self.path) - q.put(b'var1') - with self.assertRaises(ValueError): - q.get(timeout=-1) - - def test_get_timeout(self): - """Test when get failed within timeout.""" - q = Queue(self.path) - q.put(b'var1') - q.get() - with self.assertRaises(Empty): - q.get(timeout=1) - - def test_put_nowait(self): - """Tests the put_nowait interface.""" - q = Queue(self.path) - q.put_nowait(b'var1') - self.assertEqual(b'var1', q.get()) - q.task_done() - - def test_put_maxsize_reached(self): - """Test that maxsize reached.""" - q = Queue(self.path, maxsize=10) - for x in range(10): - q.put(x) - - with self.assertRaises(Full): - q.put(b'full_now', block=False) - - def test_put_timeout_reached(self): - """Test put with block and timeout.""" - q = Queue(self.path, maxsize=2) - for x in range(2): - q.put(x) - - with self.assertRaises(Full): - q.put(b'full_and_timeout', block=True, timeout=1) - - def test_put_timeout_negative(self): - """Test and put with timeout < 0""" - q = Queue(self.path, maxsize=1) - with self.assertRaises(ValueError): - q.put(b'var1', timeout=-1) - - def test_put_block_and_wait(self): - """Test block until queue is not full.""" - q = Queue(self.path, maxsize=10) - - def consumer(): - for i in range(5): - q.get() - q.task_done() - - def producer(): - for j in range(16): - q.put('var%d' % j) - - p = Thread(target=producer) - p.start() - c = Thread(target=consumer) - c.start() - c.join() - val = q.get_nowait() - p.join() - self.assertEqual('var5', val) - - def test_clear_tail_file(self): - """Teat that only remove tail file when calling task_done.""" - q = Queue(self.path, chunksize=10) - for i in range(35): - q.put('var%d' % i) - - for _ in range(15): - q.get() - - q = Queue(self.path, chunksize=10) - self.assertEqual(q.qsize(), 35) - - for _ in range(15): - q.get() - # the first tail file gets removed after task_done - q.task_done() - for _ in range(16): - q.get() - # the second and third files get removed after task_done - q.task_done() - self.assertEqual(q.qsize(), 4) - - def test_windows_error(self): - """Test the rename restrictions of Windows""" - q = Queue(self.path) - q.put(b'a') - fake_error = OSError('Cannot create a file when' - 'that file already exists') - setattr(fake_error, 'winerror', 183) - os_rename = os.rename - i = [] - - def fake_remove(src, dst): - if not i: - i.append(1) - raise fake_error - else: - i.append(2) - os_rename(src, dst) - - with mock.patch('os.rename', new=fake_remove): - q.put(b'b') - - self.assertTrue(b'a', q.get()) - self.assertTrue(b'b', q.get()) - - def test_protocol_1(self): - shutil.rmtree(self.path) - - q = Queue(path=self.path) - self.assertEqual(q.protocol, 2 if sys.version_info[0] == 2 else 4) - - def test_protocol_2(self): - q = Queue(path=self.path) - self.assertEqual(q.protocol, None) diff -Nru python-persist-queue-0.4.0/tests/test_sqlackqueue.py python-persist-queue-0.5.1/tests/test_sqlackqueue.py --- python-persist-queue-0.4.0/tests/test_sqlackqueue.py 2018-06-03 13:15:35.000000000 +0000 +++ python-persist-queue-0.5.1/tests/test_sqlackqueue.py 1970-01-01 00:00:00.000000000 +0000 @@ -1,324 +0,0 @@ -# coding=utf-8 - -import random -import shutil -import sys -import tempfile -import unittest -from threading import Thread - -from persistqueue.sqlackqueue import ( - SQLiteAckQueue, - FILOSQLiteAckQueue, - UniqueAckQ) -from persistqueue import Empty - - -class SQLite3AckQueueTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='sqlackqueue') - self.auto_commit = True - - def tearDown(self): - shutil.rmtree(self.path, ignore_errors=True) - - def test_raise_empty(self): - q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) - - q.put('first') - d = q.get() - self.assertEqual('first', d) - self.assertRaises(Empty, q.get, block=False) - - # assert with timeout - self.assertRaises(Empty, q.get, block=True, timeout=1.0) - # assert with negative timeout - self.assertRaises(ValueError, q.get, block=True, timeout=-1.0) - - def test_open_close_single(self): - """Write 1 item, close, reopen checking if same item is there""" - - q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) - q.put(b'var1') - del q - q = SQLiteAckQueue(self.path) - self.assertEqual(1, q.qsize()) - self.assertEqual(b'var1', q.get()) - - def test_open_close_1000(self): - """Write 1000 items, close, reopen checking if all items are there""" - - q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) - for i in range(1000): - q.put('var%d' % i) - - self.assertEqual(1000, q.qsize()) - del q - q = SQLiteAckQueue(self.path) - self.assertEqual(1000, q.qsize()) - for i in range(1000): - data = q.get() - self.assertEqual('var%d' % i, data) - # assert adding another one still works - q.put('foobar') - data = q.get() - self.assertEqual('foobar', data) - - def test_random_read_write(self): - """Test random read/write""" - - q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit) - n = 0 - for _ in range(1000): - if random.random() < 0.5: - if n > 0: - q.get() - n -= 1 - else: - self.assertRaises(Empty, q.get, block=False) - else: - q.put('var%d' % random.getrandbits(16)) - n += 1 - - def test_multi_threaded_parallel(self): - """Create consumer and producer threads, check parallelism""" - - # self.skipTest("Not supported multi-thread.") - - m_queue = SQLiteAckQueue( - path=self.path, multithreading=True, - auto_commit=self.auto_commit - ) - - def producer(): - for i in range(1000): - m_queue.put('var%d' % i) - - def consumer(): - for i in range(1000): - x = m_queue.get(block=True) - self.assertEqual('var%d' % i, x) - - c = Thread(target=consumer) - c.start() - p = Thread(target=producer) - p.start() - p.join() - c.join() - self.assertEqual(0, m_queue.size) - self.assertEqual(0, len(m_queue)) - self.assertRaises(Empty, m_queue.get, block=False) - - def test_multi_threaded_multi_producer(self): - """Test sqlqueue can be used by multiple producers.""" - queue = SQLiteAckQueue( - path=self.path, multithreading=True, - auto_commit=self.auto_commit - ) - - def producer(seq): - for i in range(10): - queue.put('var%d' % (i + (seq * 10))) - - def consumer(): - for _ in range(100): - data = queue.get(block=True) - self.assertTrue('var' in data) - - c = Thread(target=consumer) - c.start() - producers = [] - for seq in range(10): - t = Thread(target=producer, args=(seq,)) - t.start() - producers.append(t) - - for t in producers: - t.join() - - c.join() - - def test_multiple_consumers(self): - """Test sqlqueue can be used by multiple consumers.""" - - queue = SQLiteAckQueue( - path=self.path, multithreading=True, - auto_commit=self.auto_commit - ) - - def producer(): - for x in range(1000): - queue.put('var%d' % x) - - counter = [] - # Set all to 0 - for _ in range(1000): - counter.append(0) - - def consumer(index): - for i in range(200): - data = queue.get(block=True) - self.assertTrue('var' in data) - counter[index * 200 + i] = data - - p = Thread(target=producer) - p.start() - consumers = [] - for index in range(5): - t = Thread(target=consumer, args=(index,)) - t.start() - consumers.append(t) - - p.join() - for t in consumers: - t.join() - - self.assertEqual(0, queue.qsize()) - for x in range(1000): - self.assertNotEqual(0, counter[x], - "not 0 for counter's index %s" % x) - - def test_protocol_1(self): - shutil.rmtree(self.path, ignore_errors=True) - q = SQLiteAckQueue(path=self.path) - self.assertEqual(q.protocol, 2 if sys.version_info[0] == 2 else 4) - - def test_protocol_2(self): - q = SQLiteAckQueue(path=self.path) - self.assertEqual(q.protocol, None) - - def test_ack_and_clear(self): - q = SQLiteAckQueue(path=self.path) - q._MAX_ACKED_LENGTH = 10 - ret_list = [] - for _ in range(100): - q.put("val%s" % _) - for _ in range(100): - ret_list.append(q.get()) - for ret in ret_list: - q.ack(ret) - self.assertEqual(q.acked_count(), 100) - q.clear_acked_data() - self.assertEqual(q.acked_count(), 10) - - def test_ack_unknown_item(self): - q = SQLiteAckQueue(path=self.path) - q.put("val1") - val1 = q.get() - q.ack("val2") - q.nack("val3") - q.ack_failed("val4") - self.assertEqual(q.qsize(), 0) - self.assertEqual(q.unack_count(), 1) - q.ack(val1) - self.assertEqual(q.unack_count(), 0) - - def test_ack_unack_ack_failed(self): - q = SQLiteAckQueue(path=self.path) - q.put("val1") - q.put("val2") - q.put("val3") - val1 = q.get() - val2 = q.get() - val3 = q.get() - # qsize should be zero when all item is getted from q - self.assertEqual(q.qsize(), 0) - self.assertEqual(q.unack_count(), 3) - # nack will let the item requeued as ready status - q.nack(val1) - self.assertEqual(q.qsize(), 1) - self.assertEqual(q.ready_count(), 1) - # ack failed is just mark item as ack failed - q.ack_failed(val3) - self.assertEqual(q.ack_failed_count(), 1) - # ack should not effect qsize - q.ack(val2) - self.assertEqual(q.acked_count(), 1) - self.assertEqual(q.qsize(), 1) - # all ack* related action will reduce unack count - self.assertEqual(q.unack_count(), 0) - # reget the nacked item - ready_val = q.get() - self.assertEqual(ready_val, val1) - q.ack(ready_val) - self.assertEqual(q.qsize(), 0) - self.assertEqual(q.acked_count(), 2) - self.assertEqual(q.ready_count(), 0) - - -class SQLite3QueueInMemory(SQLite3AckQueueTest): - def setUp(self): - self.path = ":memory:" - self.auto_commit = True - - def test_open_close_1000(self): - self.skipTest('Memory based sqlite is not persistent.') - - def test_open_close_single(self): - self.skipTest('Memory based sqlite is not persistent.') - - def test_multiple_consumers(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_multi_threaded_multi_producer(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_multi_threaded_parallel(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_task_done_with_restart(self): - self.skipTest('Skipped due to not persistent.') - - def test_protocol_2(self): - self.skipTest('In memory queue is always new.') - - -class FILOSQLite3AckQueueTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='filo_sqlackqueue') - self.auto_commit = True - - def tearDown(self): - shutil.rmtree(self.path, ignore_errors=True) - - def test_open_close_1000(self): - """Write 1000 items, close, reopen checking if all items are there""" - - q = FILOSQLiteAckQueue(self.path, auto_commit=self.auto_commit) - for i in range(1000): - q.put('var%d' % i) - self.assertEqual(1000, q.qsize()) - del q - q = FILOSQLiteAckQueue(self.path) - self.assertEqual(1000, q.qsize()) - for i in range(1000): - data = q.get() - self.assertEqual('var%d' % (999 - i), data) - # assert adding another one still works - q.put('foobar') - data = q.get() - self.assertEqual('foobar', data) - - -class SQLite3UniqueAckQueueTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='sqlackqueue') - self.auto_commit = True - - def test_add_duplicate_item(self): - q = UniqueAckQ(self.path) - q.put(1111) - self.assertEqual(1, q.size) - # put duplicate item - q.put(1111) - self.assertEqual(1, q.size) - - q.put(2222) - self.assertEqual(2, q.size) - - del q - q = UniqueAckQ(self.path) - self.assertEqual(2, q.size) diff -Nru python-persist-queue-0.4.0/tests/test_sqlqueue.py python-persist-queue-0.5.1/tests/test_sqlqueue.py --- python-persist-queue-0.4.0/tests/test_sqlqueue.py 2018-06-10 14:51:10.000000000 +0000 +++ python-persist-queue-0.5.1/tests/test_sqlqueue.py 1970-01-01 00:00:00.000000000 +0000 @@ -1,352 +0,0 @@ -# coding=utf-8 - -import random -import shutil -import sys -import tempfile -import unittest -from threading import Thread - -from persistqueue import SQLiteQueue, FILOSQLiteQueue, UniqueQ -from persistqueue import Empty - - -class SQLite3QueueTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='sqlqueue') - self.auto_commit = True - - def tearDown(self): - shutil.rmtree(self.path, ignore_errors=True) - - def test_raise_empty(self): - q = SQLiteQueue(self.path, auto_commit=self.auto_commit) - - q.put('first') - d = q.get() - self.assertEqual('first', d) - self.assertRaises(Empty, q.get, block=False) - - # assert with timeout - self.assertRaises(Empty, q.get, block=True, timeout=1.0) - # assert with negative timeout - self.assertRaises(ValueError, q.get, block=True, timeout=-1.0) - - def test_open_close_single(self): - """Write 1 item, close, reopen checking if same item is there""" - - q = SQLiteQueue(self.path, auto_commit=self.auto_commit) - q.put(b'var1') - del q - q = SQLiteQueue(self.path) - self.assertEqual(1, q.qsize()) - self.assertEqual(b'var1', q.get()) - - def test_open_close_1000(self): - """Write 1000 items, close, reopen checking if all items are there""" - - q = SQLiteQueue(self.path, auto_commit=self.auto_commit) - for i in range(1000): - q.put('var%d' % i) - - self.assertEqual(1000, q.qsize()) - del q - q = SQLiteQueue(self.path) - self.assertEqual(1000, q.qsize()) - for i in range(1000): - data = q.get() - self.assertEqual('var%d' % i, data) - # assert adding another one still works - q.put('foobar') - data = q.get() - self.assertEqual('foobar', data) - - def test_random_read_write(self): - """Test random read/write""" - - q = SQLiteQueue(self.path, auto_commit=self.auto_commit) - n = 0 - for _ in range(1000): - if random.random() < 0.5: - if n > 0: - q.get() - n -= 1 - else: - self.assertRaises(Empty, q.get, block=False) - else: - q.put('var%d' % random.getrandbits(16)) - n += 1 - - def test_multi_threaded_parallel(self): - """Create consumer and producer threads, check parallelism""" - - # self.skipTest("Not supported multi-thread.") - - m_queue = SQLiteQueue(path=self.path, multithreading=True, - auto_commit=self.auto_commit) - - def producer(): - for i in range(1000): - m_queue.put('var%d' % i) - - def consumer(): - for i in range(1000): - x = m_queue.get(block=True) - self.assertEqual('var%d' % i, x) - - c = Thread(target=consumer) - c.start() - p = Thread(target=producer) - p.start() - p.join() - c.join() - self.assertEqual(0, m_queue.size) - self.assertEqual(0, len(m_queue)) - self.assertRaises(Empty, m_queue.get, block=False) - - def test_multi_threaded_multi_producer(self): - """Test sqlqueue can be used by multiple producers.""" - queue = SQLiteQueue(path=self.path, multithreading=True, - auto_commit=self.auto_commit) - - def producer(seq): - for i in range(10): - queue.put('var%d' % (i + (seq * 10))) - - def consumer(): - for _ in range(100): - data = queue.get(block=True) - self.assertTrue('var' in data) - - c = Thread(target=consumer) - c.start() - producers = [] - for seq in range(10): - t = Thread(target=producer, args=(seq,)) - t.start() - producers.append(t) - - for t in producers: - t.join() - - c.join() - - def test_multiple_consumers(self): - """Test sqlqueue can be used by multiple consumers.""" - - queue = SQLiteQueue(path=self.path, multithreading=True, - auto_commit=self.auto_commit) - - def producer(): - for x in range(1000): - queue.put('var%d' % x) - - counter = [] - # Set all to 0 - for _ in range(1000): - counter.append(0) - - def consumer(index): - for i in range(200): - data = queue.get(block=True) - self.assertTrue('var' in data) - counter[index * 200 + i] = data - - p = Thread(target=producer) - p.start() - consumers = [] - for index in range(5): - t = Thread(target=consumer, args=(index,)) - t.start() - consumers.append(t) - - p.join() - for t in consumers: - t.join() - - self.assertEqual(0, queue.qsize()) - for x in range(1000): - self.assertNotEqual(0, counter[x], - "not 0 for counter's index %s" % x) - - self.assertEqual(len(set(counter)), len(counter)) - - def test_task_done_with_restart(self): - """Test that items are not deleted before task_done.""" - - q = SQLiteQueue(path=self.path, auto_commit=False) - - for i in range(1, 11): - q.put(i) - - self.assertEqual(1, q.get()) - self.assertEqual(2, q.get()) - # size is correct before task_done - self.assertEqual(8, q.qsize()) - q.task_done() - # make sure the size still correct - self.assertEqual(8, q.qsize()) - - self.assertEqual(3, q.get()) - # without task done - del q - q = SQLiteQueue(path=self.path, auto_commit=False) - # After restart, the qsize and head item are the same - self.assertEqual(8, q.qsize()) - # After restart, the queue still works - self.assertEqual(3, q.get()) - self.assertEqual(7, q.qsize()) - - def test_protocol_1(self): - shutil.rmtree(self.path, ignore_errors=True) - q = SQLiteQueue(path=self.path) - self.assertEqual(q.protocol, 2 if sys.version_info[0] == 2 else 4) - - def test_protocol_2(self): - q = SQLiteQueue(path=self.path) - self.assertEqual(q.protocol, None) - - -class SQLite3QueueNoAutoCommitTest(SQLite3QueueTest): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='sqlqueue_auto_commit') - self.auto_commit = False - - def test_multiple_consumers(self): - """ - FAIL: test_multiple_consumers ( - -tests.test_sqlqueue.SQLite3QueueNoAutoCommitTest) - Test sqlqueue can be used by multiple consumers. - ---------------------------------------------------------------------- - Traceback (most recent call last): - File "persist-queue\tests\test_sqlqueue.py", line 183, - -in test_multiple_consumers - self.assertEqual(0, queue.qsize()) - AssertionError: 0 != 72 - :return: - """ - self.skipTest('Skipped due to a known bug above.') - - -class SQLite3QueueInMemory(SQLite3QueueTest): - def setUp(self): - self.path = ":memory:" - self.auto_commit = True - - def test_open_close_1000(self): - self.skipTest('Memory based sqlite is not persistent.') - - def test_open_close_single(self): - self.skipTest('Memory based sqlite is not persistent.') - - def test_multiple_consumers(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_multi_threaded_multi_producer(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_multi_threaded_parallel(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_task_done_with_restart(self): - self.skipTest('Skipped due to not persistent.') - - def test_protocol_2(self): - self.skipTest('In memory queue is always new.') - - -class FILOSQLite3QueueTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='filo_sqlqueue') - self.auto_commit = True - - def tearDown(self): - shutil.rmtree(self.path, ignore_errors=True) - - def test_open_close_1000(self): - """Write 1000 items, close, reopen checking if all items are there""" - - q = FILOSQLiteQueue(self.path, auto_commit=self.auto_commit) - for i in range(1000): - q.put('var%d' % i) - self.assertEqual(1000, q.qsize()) - del q - q = FILOSQLiteQueue(self.path) - self.assertEqual(1000, q.qsize()) - for i in range(1000): - data = q.get() - self.assertEqual('var%d' % (999 - i), data) - # assert adding another one still works - q.put('foobar') - data = q.get() - self.assertEqual('foobar', data) - - -class FILOSQLite3QueueNoAutoCommitTest(FILOSQLite3QueueTest): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='filo_sqlqueue_auto_commit') - self.auto_commit = False - - -class SQLite3UniqueQueueTest(unittest.TestCase): - def setUp(self): - self.path = tempfile.mkdtemp(suffix='sqlqueue') - self.auto_commit = True - - def test_add_duplicate_item(self): - q = UniqueQ(self.path) - q.put(1111) - self.assertEqual(1, q.size) - # put duplicate item - q.put(1111) - self.assertEqual(1, q.size) - - q.put(2222) - self.assertEqual(2, q.size) - - del q - q = UniqueQ(self.path) - self.assertEqual(2, q.size) - - def test_multiple_consumers(self): - """Test UniqueQ can be used by multiple consumers.""" - - queue = UniqueQ(path=self.path, multithreading=True, - auto_commit=self.auto_commit) - - def producer(): - for x in range(1000): - queue.put('var%d' % x) - - counter = [] - # Set all to 0 - for _ in range(1000): - counter.append(0) - - def consumer(index): - for i in range(200): - data = queue.get(block=True) - self.assertTrue('var' in data) - counter[index * 200 + i] = data - - p = Thread(target=producer) - p.start() - consumers = [] - for index in range(5): - t = Thread(target=consumer, args=(index,)) - t.start() - consumers.append(t) - - p.join() - for t in consumers: - t.join() - - self.assertEqual(0, queue.qsize()) - for x in range(1000): - self.assertNotEqual(0, counter[x], - "not 0 for counter's index %s" % x) - - self.assertEqual(len(set(counter)), len(counter)) diff -Nru python-persist-queue-0.4.0/tox.ini python-persist-queue-0.5.1/tox.ini --- python-persist-queue-0.4.0/tox.ini 1970-01-01 00:00:00.000000000 +0000 +++ python-persist-queue-0.5.1/tox.ini 2020-11-09 05:35:13.000000000 +0000 @@ -0,0 +1,28 @@ +[tox] + +minversion = 2.0 +skipsdist = True +envlist = py27, py34, py35, py36, py37, pep8, cover +deps = -r{toxinidir}/test-requirements.txt + +[testenv] + +setenv = VIRTUAL_ENV={envdir} + +usedevelop = True +deps = -r{toxinidir}/test-requirements.txt +whitelist_externals = + bash + find +commands = + nose2 {posargs} + +[testenv:pep8] + +commands = + flake8 ./persistqueue ./tests {posargs} + +[testenv:cover] + +commands = + nose2 --with-coverage --coverage-report xml --coverage-report html --coverage-report term {posargs}