diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/fusefs.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/fusefs.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/fusefs.py 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/fusefs.py 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,344 @@ +#!/usr/bin/env python + +from __future__ import division, print_function, absolute_import + +import os +import sys +import warnings + +import errno +import itertools +import logging +import stat + +from time import time +from threading import Thread + +from .scanner import Scanner +from .tree import Filesystem, FoundError, NotFoundError +from .util import set_new_availability +from bithorde import Client, parseConfig, message +from distdb import DB + +import fusell + +log = logging.getLogger() + +# For Python 2 + 3 compatibility +if sys.version_info[0] == 2: + def next(it): + return it.next() +else: + buffer = memoryview + +current_uid = os.getuid() +current_gid = os.getgid() + +ino_source = itertools.count(1) +fh_source = itertools.count(1) + + +class Pool(set): + def __init__(self, create): + self._create = create + set.__init__(self) + + def get(self): + try: + return self.pop() + except KeyError: + return self._create() + + def put(self, x): + self.add(x) + + +ino_pool = Pool(create=lambda: next(ino_source)) +fh_pool = Pool(create=lambda: next(fh_source)) +base_fields = set((u'directory', u'name', u'ext', u'xt', u'bh_status', u'bh_status_confirmed', u'bh_availability', u'filesize')) + + +class IDResource(object): + def __init__(self): + self._dict = dict() + iter = itertools.count(1) + self._pool = Pool(create=lambda: next(iter)) + + def insert(self, value): + id = self._pool.get() + self._dict[id] = value + return id + + def __delitem__(self, id): + del self._dict[id] + self._pool.put(id) + + def __getitem__(self, id): + return self._dict[id] + + +class INode(object): + __slots__ = ('ino') + MODE_0755 = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH + MODE_0555 = stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH + + @staticmethod + def map(x): + if hasattr(x, 'ls'): + return Directory(x) + else: + return File(x) + + def __init__(self): + super(INode, self).__init__() + self.ino = ino_pool.get() + + def __del__(self): + ino_pool.put(self.ino) + + def entry(self): + entry = fusell.fuse_entry_param() + entry.ino = self.ino + entry.generation = 0 + entry.entry_timeout = 2 + entry.attr_timeout = 10 + + entry.attr = self.attr() + + return entry + + def attr(self): + attr = fusell.c_stat() + + attr.st_ino = self.ino + attr.st_mode = self.MODE_0555 + attr.st_nlink = 1 + attr.st_uid = current_uid + attr.st_gid = current_gid + attr.st_rdev = 1 + attr.st_size = 0 + + attr.st_blksize = 512 + attr.st_blocks = 1 + now = time() + attr.st_atime = now + attr.st_mtime = now + attr.st_ctime = now + + return attr + + +class File(INode): + __slots__ = ('f') + + def __init__(self, f): + INode.__init__(self) + self.f = f + + def attr(self): + attr = INode.attr(self) + attr.st_mode |= stat.S_IFREG + attr.st_size = int(self.f.obj.any(u'filesize', 0)) + return attr + + def ids(self): + return self.f.ids().proto_ids() + + +class Directory(INode): + __slots__ = ('d', 'readers') + + def __init__(self, d): + INode.__init__(self) + self.d = d + self.readers = IDResource() + + def attr(self): + attr = INode.attr(self) + attr.st_mode |= stat.S_IFDIR + attr.st_nlink = 2 + attr.st_size = 0 + return attr + + def lookup(self, name): + try: + f = self.d[name] + except NotFoundError: + raise(fusell.FUSEError(errno.ENOENT)) + else: + return INode.map(f) + + def readdir(self, id): + if id == 0: + iter = self.d.ls() + id = self.readers.insert(iter) + else: + iter = self.readers[id] + + for name, f in iter: + yield name, INode.map(f), id + + def mkdir(self, name): + return Directory(self.d.mkdir(name)) + + +class Operations(fusell.Filesystem): + def __init__(self, bithorde, database): + self.fs = Filesystem(database) + self.root = Directory(self.fs.root()) + + self.inodes = { + fusell.ROOT_INODE: self.root + } + self.files = {} + + self.bithorde = bithorde + + def _inode_resolve(self, ino, cls=INode): + try: + inode = self.inodes[ino] + assert isinstance(inode, cls) + return inode + except KeyError: + raise(fusell.FUSEError(errno.ENOENT)) + + def lookup(self, inode_p, name): + inode_p = self._inode_resolve(inode_p, Directory) + inode = inode_p.lookup(name.decode('utf-8')) + self.inodes[inode.ino] = inode + return inode.entry() + + def forget(self, ino, nlookup): + # Assuming the kernel only notifies when nlookup really reaches 0 + try: + del self.inodes[ino] + except KeyError: + warnings.warn('Tried to forget something already missing.') + + def getattr(self, inode): + inode = self._inode_resolve(inode) + return inode.attr(), 10 + + def opendir(self, inode): + inode = self._inode_resolve(inode, Directory) + return inode.ino + + def readdir(self, inode, id): + directory = self._inode_resolve(inode, Directory) + + for name, inode, id in directory.readdir(id): + yield name.encode('utf8'), inode.attr(), id + + def releasedir(self, inode): + pass + + def open(self, inode, flags): + inode = self._inode_resolve(inode, File) + unsupported_flags = os.O_WRONLY | os.O_RDWR + if (flags & unsupported_flags): + raise(fusell.FUSEError(errno.EINVAL)) + fh = fh_pool.get() + asset = self.bithorde.open(inode.ids()) + status = asset.status() + status_ok = status and status.status == message.SUCCESS + + if not status_ok: + db = self.fs.db + with db.transaction() as tr: + set_new_availability(inode.f.obj, status_ok) + tr.update(inode.f.obj) + raise(fusell.FUSEError(errno.ENOENT)) + + self.files[fh] = asset + return fh + + def read(self, fh, off, size): + try: + f = self.files[fh] + except KeyError: + raise(fusell.FUSEError(errno.EBADF)) + + return f.read(off, size) + + def release(self, fh): + try: + del self.files[fh] + except KeyError: + warnings.warn("Trying to release unknown file handle: %s" % fh) + + def statfs(self): + stat = fusell.c_statvfs + stat.f_bsize = 64 * 1024 + stat.f_frsize = 64 * 1024 + stat.f_blocks = stat.f_bfree = stat.f_bavail = 0 + stat.f_files = stat.f_ffree = stat.f_favail = 0 + return stat + + def mkdir(self, parent, name): + parent = self._inode_resolve(parent, Directory) + try: + return parent.mkdir(name).entry() + except FoundError: + raise(fusell.FUSEError(errno.EEXIST)) + except NotFoundError: + raise(fusell.FUSEError(errno.ENOENT)) + + def rename(self, parent, name, newparent, newname): + parent = self._inode_resolve(parent, Directory) + newparent = self._inode_resolve(newparent, Directory) + + try: + self.fs.mv((parent.d, name), (newparent.d, newname)) + except FoundError: + raise(fusell.FUSEError(errno.EEXIST)) + except NotFoundError: + raise(fusell.FUSEError(errno.ENOENT)) + + +def background_scan(args, config): + while True: + try: + bithorde = Client(parseConfig(config.items('BITHORDE')), autoconnect=False) + bithorde.connect() + + Scanner(DB(args.db), bithorde).run() + except Exception: + log.exception("Error in scanner") + + +def prepare_args(parser, config): + parser.add_argument("--fs-debug", action="store_true", default=False, + help="Enable FS-debugging") + parser.add_argument("--no-scan", dest="scan", action="store_false", default=True, + help="Don't run local scanner, rely on remote") + parser.add_argument("mountpoint", help="Directory to mount the file under, I.E. 'dir/file'") + parser.set_defaults(main=main, setup=setup) + + +def setup(args, config, db): + bithorde = Client(parseConfig(config.items('BITHORDE')), autoconnect=False) + bithorde.connect() + + fsopts = ['nonempty', 'allow_other', 'max_read=131072', 'fsname=bhindex'] + if args.fs_debug: + fsopts.append('debug') + ops = Operations(database=db, bithorde=bithorde) + fs = fusell.FUSELL(ops, args.mountpoint, fsopts) + + if args.scan: + scanner = Thread(target=background_scan, args=(args, config)) + scanner.setDaemon(True) + else: + scanner = None + + return fs.mount(), (fs, scanner) + + +def main(fs, scanner): + if scanner: + scanner.start() + + try: + fs.run() + except Exception: + log.exception("Error in FuseFS") diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/__init__.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/__init__.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/__init__.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/__init__.py 2017-10-20 13:01:54.000000000 +0000 @@ -1,5 +1,11 @@ +from contextlib import contextmanager import logging -from . import add, cat, config, links, scanner, syncer, tree, vacuum +from . import add, cat, config, fusefs, links, scanner, syncer, tree, vacuum + + +@contextmanager +def noop_context_manager(): + yield def main(args=None): @@ -18,6 +24,7 @@ CLI.add_argument('--setuid', dest="suid", help="Set username before running") CLI.add_argument('--verbose', '-v', action="store_true", help="write debug-level output") + CLI.set_defaults(setup=lambda args, cfg, db: (noop_context_manager(), (args, cfg, db))) subparsers = CLI.add_subparsers(title="Sub-commands") Add = subparsers.add_parser('add', help='Add files to BitHorde and BHIndex') @@ -32,6 +39,9 @@ LS = subparsers.add_parser('ls', help='List files in a directory of BHIndex') tree.prepare_ls_args(LS, cfg) + MOUNT = subparsers.add_parser('mount', help='Mount bhindex as a FUSE file system') + fusefs.prepare_args(MOUNT, cfg) + MV = subparsers.add_parser('mv', help='Move a file or directory in the bithorde tree') tree.prepare_mv_args(MV, cfg) @@ -51,14 +61,17 @@ else: lvl = logging.INFO logging.basicConfig(level=lvl, format="%(levelname)-8s %(asctime)-15s <%(name)s> %(message)s") + logging.getLogger().setLevel(lvl) db = DB(args.db) + ctx, main_args = args.setup(args, cfg, db) if args.suid: from pwd import getpwnam from os import setuid setuid(getpwnam(args.suid).pw_uid) - args.main(args, cfg, db) + with ctx: + args.main(*main_args) except ArgumentError, e: CLI.error(e) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/links.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/links.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/links.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/links.py 2017-10-20 13:01:54.000000000 +0000 @@ -5,7 +5,7 @@ import os.path as path from time import time -from distdb import ANY +from distdb import Key, ObjId from bithorde import Client, parseConfig from .tree import Filesystem @@ -103,9 +103,12 @@ t = time() count = size = 0 - crit = {'directory': ANY, 'xt': ANY} + crit = ( + ObjId.startswith('tree:tiger:'), + Key('directory').any(), + ) if not force_all: - crit['@linked'] = None + crit += (Key('@linked').missing(),) objs = self.db.query(crit) for obj, status_ok in cachedAssetLiveChecker(self.bithorde, objs, db=self.db): if not status_ok: diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/links_test.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/links_test.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/links_test.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/links_test.py 2017-10-20 13:01:54.000000000 +0000 @@ -68,7 +68,7 @@ t.update(Object(u"dir:apa", { u'directory': TimedValues(u"dir:/apa"), })) - t.update(Object('some_file', { + t.update(Object('tree:tiger:ASDASDSADASDASDSADASDASDSADASDASDSADASD', { u'directory': TimedValues(u"dir:apa/movie"), u'xt': TimedValues(xt), })) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/scanner.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/scanner.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/scanner.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/scanner.py 2017-10-20 13:01:54.000000000 +0000 @@ -10,12 +10,13 @@ from bithorde import Client, parseConfig from bhindex.util import cachedAssetLiveChecker -from distdb import ANY, Sorting, TimedBefore +from distdb import Key, ObjId, Sort SAFETY = 60 FUDGE = 30 MAX_BATCH = 1000 MIN_SLEEP = 5 +UNCHECKED_SCAN_INTERVAL = 360 class Scanner(object): @@ -28,20 +29,34 @@ self.processed = 0 self.size = 0 self.log = getLogger('scanner') + self.last_unchecked_scan = 0 + + def pick_stale(self, expires_before): + q = self.db.query_keyed(( + ObjId.startswith('tree:tiger:'), + Key('bh_availability').timed_before(expires_before), + ), '+bh_availability', fields=self.fields, + sortmeth=Sort.timestamp) + for _, obj in q: + yield obj.getitem('bh_availability').t, obj.id + + def pick_unchecked(self): + q = self.db.query_ids(( + ObjId.startswith('tree:tiger:'), + Key('bh_availability').missing(), + )) + for id in q: + yield 0, id def pick_batch(self, expires_before, limit=MAX_BATCH): - objs = chain( - ((0, id) for id in self.db.query_ids({ - 'xt': ANY, - 'bh_availability': None, - })), - ((obj.getitem('bh_availability').t, obj.id) for _, obj in - self.db.query_keyed({ - 'xt': ANY, - 'bh_availability': TimedBefore(expires_before), - }, '+bh_availability', fields=self.fields, - sortmeth=Sorting.timestamp)), - ) + objs = self.pick_stale(expires_before) + + # pick_unchecked is slow, and unlikely to yield anything + # so only run it in big intervals + if self.last_unchecked_scan < expires_before - UNCHECKED_SCAN_INTERVAL: + objs = chain(self.pick_unchecked(), objs) + self.last_unchecked_scan = expires_before + return islice(objs, limit) def _fudged_batch(self, expires_before, fudge=FUDGE): @@ -93,7 +108,7 @@ scanner = Scanner(db, bithorde) if args.all_objects: - scanner.run_batch(db.query({'xt': ANY}, scanner.fields)) + scanner.run_batch(db.query(ObjId.startswith('tree:tiger:'), scanner.fields)) scanner.log.info("Scanned %d assets. %d available, totaling %d GB", scanner.processed, scanner.available, scanner.size / (1024 * 1024 * 1024)) else: diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/tree.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/tree.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/tree.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/tree.py 2017-10-20 13:01:54.000000000 +0000 @@ -3,9 +3,9 @@ from time import time from warnings import warn -from distdb import Object, Starts, Sorting +from distdb import Object, Key, Sort from .bithorde import Identifiers -from .util import hasValidStatus +from .util import hasValidStatus, set_new_availability, updateFolderAvailability log = getLogger('tree') @@ -90,8 +90,8 @@ def _ls(self, t): dirids = set("%s" % dirobj.id for dirobj in self.objs) children = self.db.query_keyed( - {'directory': Starts("%s/" % d for d in dirids)}, key="+directory", - sortmeth=Sorting.split('/'), fields=('directory', 'xt', 'bh_availability'), + Key('directory').startswith("%s/" % d for d in dirids), key="+directory", + sortmeth=Sort.split('/'), fields=('directory', 'xt', 'bh_availability'), ) for dirent, child in _filterAvailable(children, t): try: @@ -108,7 +108,7 @@ def __getitem__(self, key): objs = list() for obj in self.objs: - objs += self.db.query({u'directory': "%s/%s" % (obj.id, key)}) + objs += self.db.query(Key('directory') == "%s/%s" % (obj.id, key)) if objs: return self._map(objs, key) else: @@ -117,10 +117,11 @@ def mkdir(self, name, t=None, tr=None): try: return self[name] - except: + except LookupError: directory_attr = u'%s/%s' % (self.objs[0].id, name) new = Object.new('dir') new.set('directory', directory_attr, t=t) + set_new_availability(new, True) with tr or Filesystem.db_transaction(self.db) as tr: tr.update(new) return Directory(self, (new,)) @@ -136,6 +137,7 @@ obj.set('directory', dir | {directory_attr}, t=t) with tr or Filesystem.db_transaction(self.db) as tr: tr.update(obj) + updateFolderAvailability(tr, obj, t=t) def rm(self, name, t=None, tr=None): try: @@ -261,19 +263,37 @@ dir = dir.mkdir(segment, t=t, tr=tr) return dir + def _dir_name(self, x, lookup=None): + lookup = lookup or self.lookup + if isinstance(x, Path): + return lookup(x[:-1]), x[-1] + elif isinstance(x, tuple): + dir, name = x + if not isinstance(dir, Directory): + dir = lookup(dir) + return dir, name + else: + raise TypeError + def mv(self, src, dst, t=None, tr=None): try: self.lookup(dst) raise FoundError("%s already exist") except NotFoundError: pass - src_dir = self.lookup(src[:-1]) - target = src_dir[src[-1]] + if t is None: + t = time() + + src_dir, src_name = self._dir_name(src) + subject = src_dir[src_name] with tr or self.transaction() as tr: - dst_dir = self.mkdir(dst[:-1], t=t, tr=tr) - dst_dir.link(dst[-1], target, t=t, tr=tr) - src_dir.rm(src[-1], t=t, tr=tr) + def mkdir(x): + return self.mkdir(x, t=t, tr=tr) + dst_dir, dst_name = self._dir_name(dst, lookup=mkdir) + + dst_dir.link(dst_name, subject, t=t, tr=tr) + src_dir.rm(src_name, t=t, tr=tr) def prepare_ls_args(parser, config): diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/util.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/util.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/util.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/util.py 2017-10-20 13:01:54.000000000 +0000 @@ -227,7 +227,7 @@ def _checkAsset(bithorde, obj, now): - ids = parseHashIds(obj.get('xt', tuple())) + ids = parseHashIds(obj.get('xt') or obj.id) if not ids: return obj, None diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/vacuum.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/vacuum.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/bhindex/vacuum.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/bhindex/vacuum.py 2017-10-20 13:01:54.000000000 +0000 @@ -2,7 +2,7 @@ import logging from bhindex.util import validAvailability, Counter -from distdb import ANY +from distdb import Key log = logging.getLogger("vacuum") @@ -19,7 +19,7 @@ total = Counter() wiped = Counter() with db.transaction() as tr: - for obj in db.query({'bh_availability': ANY}): + for obj in db.query(Key('bh_availability').any()): total.inc() if (validAvailability(obj, t) or 0) < availability: tr.delete(obj, t) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/bhindex-mount.service bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/bhindex-mount.service --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/bhindex-mount.service 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/bhindex-mount.service 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,12 @@ +[Unit] +Description=BHIndex FUSE Mount +Wants=network.target + +[Service] +ExecStart=/usr/bin/bhindex --setuid nobody mount /media/bhindex +ExecStopPost=/bin/umount -fl /media/bhindex +Restart=always +RestartSec=2 + +[Install] +WantedBy=multi-user.target diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/bhindex-mount.upstart bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/bhindex-mount.upstart --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/bhindex-mount.upstart 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/bhindex-mount.upstart 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,10 @@ + +start on (filesystem and net-device-up IFACE!=lo) + +console log + +exec /usr/bin/bhindex --setuid nobody mount /media/bhindex +post-stop exec /bin/umount -fl /media/bhindex + +respawn +respawn limit 5 60 diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/changelog bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/changelog --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/changelog 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/changelog 2017-10-20 13:01:54.000000000 +0000 @@ -1,8 +1,8 @@ -bhindex (0.7~201708261727+5f343b5~ubuntu16.04.1) xenial; urgency=low +bhindex (0.7~201710201247+d36d23f~ubuntu16.04.1) xenial; urgency=low * Auto build. - -- Ulrik Mikaelsson Sat, 26 Aug 2017 17:32:27 +0000 + -- Ulrik Mikaelsson Fri, 20 Oct 2017 13:01:54 +0000 bhindex (0.7) UNRELEASED; urgency=medium diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/git-build-recipe.manifest bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/git-build-recipe.manifest --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/git-build-recipe.manifest 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/git-build-recipe.manifest 2017-10-20 13:01:54.000000000 +0000 @@ -1,2 +1,2 @@ -# git-build-recipe format 0.4 deb-version {debupstream}~201708261727+5f343b5 -lp:bhindex git-commit:5f343b5e9441e32f12a96a4f00aa63bbd5f79e14 +# git-build-recipe format 0.4 deb-version {debupstream}~201710201247+d36d23f +lp:bhindex git-commit:d36d23f31f81b779a63aef89f125841a948ddf3b diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/rules bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/rules --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/debian/rules 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/debian/rules 2017-10-20 13:01:54.000000000 +0000 @@ -8,12 +8,15 @@ override_dh_systemd_enable: dh_systemd_enable --name=bhindex-syncer + dh_systemd_enable --name=bhindex-mount override_dh_installinit: dh_installinit --restart-after-upgrade --name=bhindex-syncer + dh_installinit --restart-after-upgrade --name=bhindex-mount override_dh_systemd_start: dh_systemd_start --restart-after-upgrade --name=bhindex-syncer + dh_systemd_start --restart-after-upgrade --name=bhindex-mount override_dh_auto_install: dh_auto_install diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/database.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/database.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/database.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/database.py 2017-10-20 13:01:54.000000000 +0000 @@ -5,118 +5,14 @@ import concurrent -from obj import Object, TimedValues, ANY +from obj import Object, TimedValues +from query import Key as QueryKey, Condition, Query, Sort from _setup import create_DB # Pointers to empty list will be wiped after 30 days. DEFAULT_GRACE = 3600 * 24 * 30 -# Matcher-class to match a prefix of a value -class Starts(tuple): - def __new__(cls, v): - if not hasattr(v, '__iter__'): - v = (v,) - return super(Starts, cls).__new__(cls, v) - - -class TimedBefore(int): - pass - - -def _sql_query_starts(k, v): - crit = ' OR '.join("value GLOB '%s*'" % x.replace("'", "''") for x in v) - query = """SELECT DISTINCT objid FROM map - WHERE keyid = (SELECT keyid FROM key WHERE key = ?) - AND - listid IN (SELECT listid FROM list WHERE (%s))""" % crit - return (query, (k,)) - - -# Generate an SQL condition that finds object with matching criteria -def _sql_condition(k, v): - equal_query = """SELECT objid FROM map - NATURAL JOIN list - NATURAL JOIN key - WHERE LIKELIHOOD(key = ?, 0.9375) AND UNLIKELY(list.value = ?)""" - any_query = """SELECT objid FROM map - NATURAL JOIN key - WHERE key = ? AND listid NOT NULL""" - timed_before_query = """SELECT objid FROM map - NATURAL JOIN key - WHERE LIKELIHOOD(key = ?, 0.9375) AND listid IS NOT NULL AND timestamp < ?""" - absent_query = """SELECT DISTINCT objid FROM map AS ref - WHERE LIKELIHOOD(listid NOT NULL, 0.9375) AND NOT EXISTS ( - SELECT 1 FROM map - NATURAL JOIN key - WHERE ref.objid = map.objid AND key = ? AND LIKELIHOOD(listid NOT NULL, 0.9375) - )""" - if v is ANY: - return (any_query, (k,)) - elif v is None: - return (absent_query, (k,)) - elif isinstance(v, Starts): - return _sql_query_starts(k, v) - elif isinstance(v, TimedBefore): - return (timed_before_query, (k, v)) - else: - return (equal_query, (k, v)) - - -def _sql_for_query(crit): - sql = [] - params = [] - for k, v in crit.iteritems(): - sql_fragment, new_params = _sql_condition(k, v) - sql.append(sql_fragment) - params += new_params - - return " INTERSECT ".join(sql), params - - -def _parse_sort(sort): - direction, key = sort[:1], sort[1:] - if direction == '+': - return "ASC", key - elif direction == '-': - return "DESC", key - else: - raise ValueError("Direction specifier %s in sort=%s is not valid" % (direction, sort)) - - -def _sql_for_keyed_query(crit, key, offset, sortmeth): - direction, key = _parse_sort(key) - selection, params = _sql_for_query(crit) - sort_key, sort_params = sortmeth() - if selection: - selection = "(%s) NATURAL JOIN map" % selection - else: - selection = "map" - selection = """SELECT DISTINCT objid, value FROM %s -NATURAL JOIN key -NATURAL JOIN list - WHERE key = '%s' ORDER BY %s %s, objid""" % (selection, key, sort_key, direction) - - if offset: - selection = selection + (" LIMIT -1 OFFSET %d" % offset) - - return selection, params + sort_params - - -class Sorting: - @staticmethod - def default_sort(): - return "value", [] - - @staticmethod - def split(character): - return lambda: ("substr(value, instr(value, ?))", [character]) - - @staticmethod - def timestamp(): - return "timestamp", [] - - class Transaction(object): DEFERRED = "DEFERRED" IMMEDIATE = "IMMEDIATE" @@ -197,7 +93,7 @@ if not isinstance(assignment, TimedValues): assignment = TimedValues(assignment) objid = self.db._get_id('obj', objid) - keyid = self.db._getKeyId(key) + keyid = self.db.keys(key) tstamp = self.db._query_single( "SELECT timestamp FROM map WHERE objid = ? AND keyid = ?", (objid, keyid)) if assignment.t > tstamp: @@ -219,7 +115,7 @@ cursor = self.conn.cursor() for key in obj._dirty: assignment = _dict[key] - keyid = self.db._getKeyId(key) + keyid = self.db.keys(key) old_timestamp = self.db._query_single( "SELECT timestamp FROM map WHERE objid = ? and keyid = ?", (objid, keyid)) if not old_timestamp or assignment.t >= old_timestamp: @@ -278,17 +174,72 @@ self._pending.append(obj) -class DB(object): - ANY = ANY - Starts = Starts +class Key(int): + __slots__ = ('name') + + def __new__(cls, id, name): + res = super(Key, cls).__new__(cls, id) + res.name = name + return res + + +class Keys(dict): + def __init__(self, db): + self.db = db + def __call__(self, k): + if isinstance(k, Key): + return k + v = self.get(k, None) or self._read(k) + return v + + def _read(self, name): + id = self.db._get_id('key', name) + self[id] = self[name] = Key(id, name) + return self[id] + + +def _parse_sort(sort): + direction, key = sort[:1], sort[1:] + if direction == '+': + return Sort.ASCENDING, key + elif direction == '-': + return Sort.DESCENDING, key + else: + raise ValueError("Direction specifier %s in sort=%s is not valid" % (direction, sort)) + + +class DBQuery(Query): + def __init__(self, db, keys): + self.db = db + super(DBQuery, self).__init__(keys) + + def optimize_conditions(self, crit): + if isinstance(crit, QueryKey) and isinstance(crit.v, str): + crit.v = self.db.keys(crit.v) + return crit + if isinstance(crit, Condition): + conditions = [self.optimize_conditions(x) for x in crit] + return type(crit)(*conditions) + return crit + + def apply(self): + self.criteria = self.optimize_conditions(self.criteria) + return super(DBQuery, self).apply() + + def __iter__(self): + q, args = self.apply() + return iter(self.db._query_all(q, args)) + + +class DB(object): def __init__(self, path): self.path = path self.conn = sqlite3.connect( path, timeout=60, isolation_level=None, check_same_thread=False) self.cursor = self.conn.cursor() create_DB(self.conn) - self.__keyCache = dict() + self.keys = Keys(self) self.lock = concurrent.ThreadLock() self.in_transaction = None @@ -337,14 +288,6 @@ "SELECT %sid FROM %s WHERE %s = ?" % (tbl, tbl, tbl), (id,)) return objid - def _getKeyId(self, id): - try: - return self.__keyCache[id] - except KeyError: - id = self._get_id('key', id) - self.__keyCache[id] = id - return id - def get(self, obj, fields=None): if isinstance(obj, int): objid = obj @@ -367,25 +310,30 @@ def __getitem__(self, obj): return self.get(obj) - def query_ids(self, criteria): - query, params = _sql_for_query(criteria) - query = "SELECT obj FROM obj NATURAL JOIN (%s)" % query - for objid, in self._query_all(query, params): - yield objid - - def query_raw_ids(self, criteria): - query, params = _sql_for_query(criteria) - for objid, in self._query_all(query, params): - yield objid + def _select(self, *columns): + return DBQuery(self, columns) def query(self, criteria, fields=None): - query, params = _sql_for_query(criteria) - for objid, in self._query_all(query, params): + if isinstance(criteria, Condition): + criteria = (criteria,) + for objid, in self._select('objid').where(*criteria): yield self.get(objid, fields) - def query_keyed(self, criteria, key, offset=0, fields=None, sortmeth=Sorting.default_sort): - query, params = _sql_for_keyed_query(criteria, key, offset, sortmeth) - for objid, key_value in self._query_all(query, params): + def query_ids(self, criteria, fields=None): + if isinstance(criteria, Condition): + criteria = (criteria,) + for objid, in self._select('objid').where(*criteria): + yield objid + + def query_keyed(self, criteria, key, fields=None, sortmeth=Sort.value): + if isinstance(criteria, Condition): + criteria = (criteria,) + direction, key = _parse_sort(key) + key_crit = [c for c in criteria if c.requires_key(key)] or [QueryKey(key).any()] + other_crit = [c for c in criteria if not c.requires_key(key)] + for objid, key_value in self._select('objid', 'value') \ + .where(*(key_crit + other_crit)) \ + .order_by(sortmeth, direction): yield key_value, self.get(objid, fields) def _get_list_id(self, values): diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/database_test.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/database_test.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/database_test.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/database_test.py 2017-10-20 13:01:54.000000000 +0000 @@ -5,7 +5,8 @@ from nose.tools import * from distdb.obj import TimedValues, Set, Object -from distdb.database import ANY, DB, Starts +from distdb.database import DB +from distdb.query import Key HOURS = 3600 @@ -87,11 +88,11 @@ assert_in(u'name', self.db[self.o.id]) def test_query(self): - assert_not_in(self.o, self.db.query({u'key': 'Not here'})) - assert_not_in(self.o, self.db.query({u'nokey': ANY})) - assert_in(self.o, list(self.db.query({u'key': u'Test Person'}))) - assert_in(self.o, list(self.db.query({u'apa': None}))) - assert_in(self.o, list(self.db.query({u'key': Starts('Test')}))) + assert_not_in(self.o, self.db.query(Key('key') == 'Not here')) + assert_not_in(self.o, self.db.query(Key('nokey').any())) + assert_in(self.o, list(self.db.query(Key('key') == u'Test Person'))) + assert_in(self.o, list(self.db.query(Key('apa').missing()))) + assert_in(self.o, list(self.db.query(Key('key').startswith('Test')))) def test_query_keyed(self): p1 = self.db.get("some_id") @@ -106,12 +107,6 @@ (u"Test Person", p1), ]) - def test_query_objids(self): - assert_not_in(self.o.id, self.db.query_ids({u'key': 'Not here'})) - assert_in(self.o.id, list(self.db.query_ids({u'key': u'Test Person'}))) - assert_in(self.o.id, list(self.db.query_ids({u'apa': None}))) - assert_in(self.o.id, list(self.db.query_ids({u'key': Starts('Test')}))) - def test_update_empty(self): self.o[u'key'] = Set([]) with self.db.transaction() as t: @@ -155,7 +150,7 @@ with self.db.transaction() as t: t.delete(o) assert_equal(db[o.id], Object(o.id)) - assert_equal(list(db.query({"key": ANY})), []) + assert_equal(list(db.query(Key("key").any())), []) def test_vacuum(self): db, o = self.db, self.o @@ -193,3 +188,14 @@ assert_equal(self.db.get_sync_state('my_peer'), {"last_received": 0}) self.db.set_sync_state('my_peer', 2) assert_equal(self.db.get_sync_state('my_peer'), {"last_received": 2}) + + def test_select(self): + assert_equal( + self.db._select('objid').where(Key('key').any()).apply(), + ("SELECT objid FROM map WHERE LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL", (1,)) + ) + + assert_equal( + self.db._select('objid').where(Key('NONE_EXISTING').any()).apply(), + ("SELECT objid FROM map WHERE LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL", (2,)) + ) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/__init__.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/__init__.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/__init__.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/__init__.py 2017-10-20 13:01:54.000000000 +0000 @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- -from database import ANY, DB, Sorting, Starts, TimedBefore, Transaction, AsyncCommitter +from database import DB, Transaction, AsyncCommitter from obj import Object +from query import Key, ObjId, Sort open = DB diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/query.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/query.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/query.py 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/query.py 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,277 @@ +def _mkset(x): + if isinstance(x, frozenset): + return x + elif isinstance(x, (set, tuple)): + return frozenset(x) + else: + return frozenset((x,)) + + +def _mktuple(x): + if isinstance(x, tuple): + return x + elif isinstance(x, (list)): + return tuple(x) + else: + return tuple((x,)) + + +class ConditionExpr(tuple): + def __new__(cls, sources, expr, params, condition_on_key): + sources = _mkset(sources) + params = _mktuple(params) + return super(ConditionExpr, cls).__new__(cls, (sources, expr, params, condition_on_key)) + + @property + def condition_on_key(self): + return self[3] + + def merge(self, other): + (s1, e1, p1, ck1) = self + (s2, e2, p2, ck2) = other + if isinstance(e1, basestring): + e1 = (e1,) + if isinstance(e2, basestring): + e2 = (e2,) + return ConditionExpr(s1 | s2, e1 + e2, p1 + p2, ck1 or ck2) + + +class ObjId: + def apply(self): + return ConditionExpr('obj', '', (), False) + + def __eq__(self, v): + return Matcher.build((), self, (Equals, 'obj', v)) + + def startswith(self, v): + return Matcher.build((), self, (Starts, 'obj', v)) + + +ObjId = ObjId() + + +class Condition(object): + def __init__(self, *args): + self.children = tuple(args) + + def __eq__(self, other): + return type(self) == type(other) \ + and len(self.children) == len(other.children) \ + and all(a == b for a, b in zip(self.children, other.children)) + + def __len__(self): + return len(self.children) + + def __iter__(self): + return iter(self.children) + + def __repr__(self): + return "%s%r" % (type(self).__name__, self.children) + + def requires_key(self, key): + if len(self.children) > 0: + x = self.children[0] + if isinstance(x, Key) and x.v == key: + return True + else: + return False + else: + return False + + +class Starts(Condition): + def apply(self): + key, value = self + expr = "%s GLOB '%s*'" % (key, value.replace("'", "''")) + return ConditionExpr((), expr, (), False) + + +class Equals(Condition): + def apply(self): + key, value = self + expr = "%s=?" % key + return ConditionExpr((), expr, value, False) + + +class TimedBefore(Condition): + def apply(self): + key, timestamp = self + sources, key_expr, key_params, condition_on_key = key.apply() + return ConditionExpr(sources, "%slistid IS NOT NULL AND timestamp < ?" % key_expr, key_params + (timestamp,), condition_on_key) + + +class KeyAny(Condition): + def apply(self): + key, = self + sources, key_expr, key_params, condition_on_key = key.apply() + return ConditionExpr(sources, "%slistid IS NOT NULL" % key_expr, key_params, condition_on_key) + + +class KeyMissing(Condition): + def apply(self): + key, = self + key_expr, key_params = Query(('objid',), key.any()).apply() + return ConditionExpr((), "objid NOT IN (%s)" % key_expr, key_params, False) + + def requires_key(self, _): + return False + + +class Key(object): + def __init__(self, v): + self.v = v + + def apply(self): + if isinstance(self.v, int): + return ConditionExpr((), "LIKELIHOOD(keyid=?, 0.2) AND ", (self.v,), True) + else: + return ConditionExpr('key', "LIKELIHOOD(key=?, 0.2) AND ", (self.v,), True) + + def __eq__(self, v): + return Matcher.build('list', self, (Equals, 'value', v)) + + def startswith(self, v): + return Matcher.build('list', self, (Starts, 'value', v)) + + def timed_before(self, t): + return TimedBefore(self, t) + + def any(self): + return KeyAny(self) + + def missing(self): + return KeyMissing(self) + + def __repr__(self): + return "Key(%r)" % self.v + + +class Matcher(Condition): + def apply(self): + extra_sources, key, comparer = self + key_sources, key_prefix, key_params, condition_on_key1 = key.apply() + sources, expression, params, condition_on_key2 = comparer.apply() + + sources = key_sources | extra_sources | sources + expression = "%s(%s)" % (key_prefix, expression) + params = key_params + params + condition_on_key = condition_on_key1 | condition_on_key2 + return ConditionExpr(sources, expression, params, condition_on_key) + + @classmethod + def build(cls, extra_sources, what, compare_args): + comparer = cls.build_comparer(*compare_args) + return cls(_mkset(extra_sources), what, comparer) + + @classmethod + def build_comparer(cls, func, key, value): + if hasattr(value, '__iter__'): + comparers = (cls.build_comparer(func, key, v) for v in value) + return OrCondition(*comparers) + else: + return func(key, value) + + +class ListCondition(Condition): + def __init__(self, *args): + def check(x): + assert isinstance(x, Condition) + return x + super(ListCondition, self).__init__(*(check(x) for x in args)) + + def apply(self): + res = ConditionExpr((), (), (), False) + for x in self: + res = res.merge(self.expression_for(res, x)) + + sources, expressions, params, condition_on_key = res + expression = "%s" % (self.join(expressions)) + return ConditionExpr(sources, expression, params, condition_on_key) + + def expression_for(self, res, x): + return x.apply() + + +class AndCondition(ListCondition): + join = ' AND '.join + + def expression_for(self, res, x): + sources, expression, params, condition_on_key = super(AndCondition, self).expression_for(res, x) + if res.condition_on_key and condition_on_key: + cond, params = Query(('objid',), x).apply() + expression = '(objid IN (%s))' % cond + sources = _mkset(()) + else: + expression = "(%s)" % expression + + return ConditionExpr(sources, expression, params, condition_on_key) + + +class OrCondition(ListCondition): + join = ' OR '.join + + +class Sort: + ASCENDING = "ASC" + DESCENDING = "DESC" + + @staticmethod + def value(): + return "value", () + + @staticmethod + def split(character): + return lambda: ("substr(value, instr(value, ?))", (character,)) + + @staticmethod + def timestamp(): + return "timestamp", () + + +class Query(object): + def __init__(self, columns, criteria=()): + self.columns = list(columns) + self.sources = set(('map',)) + self.sorting = () + + if isinstance(criteria, Condition): + self.where(criteria) + else: + self.where(*criteria) + + if 'obj' in columns: + self.sources.add('obj') + if 'value' in columns: + self.sources.add('list') + if 'key' in columns: + self.sources.add('key') + + def apply(self): + cols = ', '.join(self.columns) + extra_sources, where_expr, params, _ = self.criteria.apply() + sources = ' NATURAL JOIN '.join(self.sources | frozenset(extra_sources)) + + expr = "SELECT %s FROM %s" % (cols, sources) + if where_expr: + expr += " WHERE %s" % where_expr + + if self.sorting: + meth, direction = self.sorting + sort_key, sort_params = meth() + expr += " ORDER BY %s %s""" % (sort_key, direction) + params += sort_params + + return expr, params + + def where(self, *conditions): + for x in conditions: + assert isinstance(x, Condition) + if len(conditions) == 1: + self.criteria = conditions[0] + else: + self.criteria = AndCondition(*conditions) + return self + + def order_by(self, meth, direction=Sort.ASCENDING): + self.sorting = (meth, direction) + return self diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/query_test.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/query_test.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/distdb/query_test.py 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/distdb/query_test.py 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,98 @@ +from nose.tools import * +from distdb.query import * +from distdb.query import _mkset as fz + + +def test_simple_queries(): + baseq = 'SELECT objid, value FROM map NATURAL JOIN list ' + + assert_equal( + Query(('objid', 'value')).where(Key('xt').any()).apply(), + (baseq + 'NATURAL JOIN key WHERE LIKELIHOOD(key=?, 0.2) AND listid IS NOT NULL', ('xt',)), + ) + + assert_equal( + Query(('objid', 'value')).where(Key(14).any()).apply(), + (baseq + 'WHERE LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL', (14,)), + ) + + +def test_Key(): + assert_equal( + Key('xt').any().apply(), + ConditionExpr(fz('key'), 'LIKELIHOOD(key=?, 0.2) AND listid IS NOT NULL', ('xt'), True), + ) + + assert_equal( + Key(14).any().apply(), + ConditionExpr(fz(()), 'LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL', (14), True), + ) + + assert_equal( + (Key(14) == 'monkey').apply(), + ConditionExpr(fz('list'), 'LIKELIHOOD(keyid=?, 0.2) AND (value=?)', (14, 'monkey'), True), + ) + + assert_equal( + (Key(14).startswith('monkey')).apply(), + ConditionExpr(fz('list'), "LIKELIHOOD(keyid=?, 0.2) AND (value GLOB 'monkey*')", (14), True), + ) + + assert_equal( + (Key(14).startswith(('monkey', 'banana'))).apply(), + ConditionExpr(fz('list'), "LIKELIHOOD(keyid=?, 0.2) AND (value GLOB 'monkey*' OR value GLOB 'banana*')", (14), True), + ) + + assert_equal( + (Key(14).timed_before(42)).apply(), + ConditionExpr(fz(()), "LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL AND timestamp < ?", (14, 42), True), + ) + + assert_equal( + (Key(14).missing()).apply(), + ConditionExpr(fz(()), "objid NOT IN (SELECT objid FROM map WHERE LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL)", (14), False), + ) + + +def test_multiple_And_Key_Crit(): + assert_equal( + AndCondition(Key('xt').any(), Key(32).any()).apply(), + ConditionExpr( + fz('key'), + '(LIKELIHOOD(key=?, 0.2) AND listid IS NOT NULL) AND (objid IN' + ' (SELECT objid FROM map WHERE LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL))', + ('xt', 32), + True + ), + ) + + assert_equal( + AndCondition(ObjId == 'apa', Key(32).any()).apply(), + ConditionExpr( + fz(('obj')), + '((obj=?)) AND (LIKELIHOOD(keyid=?, 0.2) AND listid IS NOT NULL)', + ('apa', 32), + True + ), + ) + + +def test_ObjId(): + assert_equal( + (ObjId == 'monkey').apply(), + ConditionExpr(fz('obj'), '(obj=?)', ('monkey',), False), + ) + + assert_equal( + (ObjId.startswith('monkey')).apply(), + ConditionExpr(fz('obj'), "(obj GLOB 'monkey*')", (), False), + ) + + +def test_Sorting(): + baseq = 'SELECT objid, value FROM map NATURAL JOIN list ' + + assert_equal( + Query(('objid', 'value')).where(Key('xt').any()).order_by(Sort.value, Sort.ASCENDING).apply(), + (baseq + 'NATURAL JOIN key WHERE LIKELIHOOD(key=?, 0.2) AND listid IS NOT NULL ORDER BY value ASC', ('xt',)), + ) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/fusefs.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/fusefs.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/fusefs.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/fusefs.py 1970-01-01 00:00:00.000000000 +0000 @@ -1,360 +0,0 @@ -#!/usr/bin/env python - -from __future__ import division, print_function, absolute_import - -import atexit, os, sys, warnings - -import fusell -import errno -import stat -import os.path as path -from time import time -from types import GeneratorType -import sqlite3 -import logging -from collections import defaultdict - -import itertools - -from bhindex.util import hasValidStatus, timed - -from bithorde import Client, parseConfig, parseHashIds, message - -log = logging.getLogger() - -# For Python 2 + 3 compatibility -if sys.version_info[0] == 2: - def next(it): - return it.next() -else: - buffer = memoryview - -current_uid = os.getuid() -current_gid = os.getgid() - -ino_source = itertools.count(1) -fh_source = itertools.count(1) - -class Pool(set): - def __init__(self, create): - self._create = create - set.__init__(self) - def get(self): - try: - return self.pop() - except KeyError: - return self._create() - def put(self, x): - self.add(x) - -ino_pool = Pool(create=lambda: next(ino_source)) -fh_pool = Pool(create=lambda: next(fh_source)) - -class INode(object): - MODE_0755 = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH - MODE_0555 = stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH - - def __init__(self): - super(INode, self).__init__() - self.ino = ino_pool.get() - - def __del__(self): - ino_pool.put(self.ino) - - def entry(self): - entry = fusell.fuse_entry_param() - entry.ino = self.ino - entry.generation = 0 - entry.entry_timeout = 2 - entry.attr_timeout = 10 - - entry.attr = self.attr() - - return entry - - def attr(self): - attr = fusell.c_stat() - - attr.st_ino = self.ino - attr.st_mode = self.MODE_0555 - attr.st_nlink = 1 - attr.st_uid = current_uid - attr.st_gid = current_gid - attr.st_rdev = 1 - attr.st_size = 0 - - attr.st_blksize = 512 - attr.st_blocks = 1 - now = time() - attr.st_atime = now - attr.st_mtime = now - attr.st_ctime = now - - return attr - -class Timed: - def __init__(self, tag): - self.tag = tag - - def __enter__(self): - self.start = time() - return self - - def __exit__(self, type, value, traceback): - delta = (time() - self.start) * 1000 - log.debug("<%s>: %.1fms" % (self.tag, delta)) - -def timed(method): - def timed(*args, **kw): - with Timed("%r (%r, %r)" % (method.__name__, args, kw)): - res = method(*args, **kw) - if isinstance(res, GeneratorType): - return list(res) - else: - return res - - return result - - return timed - -import db, config -config = config.read() -DB=db.open(config.get('DB', 'file')) - -fields=set((u'directory', u'name', u'ext', u'xt', u'bh_status', u'bh_status_confirmed', u'bh_availability', u'filesize')) -def scan(directory_obj): - dir_prefix = directory_obj.id+'/' - for obj in DB.query({u'directory': db.Starts(dir_prefix)}, fields=fields): - name_found = 0 - for directory in obj['directory']: - if directory.startswith(dir_prefix): - name = directory[len(dir_prefix):] - if name: - name_found += 1 - yield name, obj - - if not name_found and obj.any('name'): - name = obj.any('name') - ext = obj.any('ext') - if ext: - if ext.startswith('.'): - name += ext - else: - name += ".%s" % obj.any('ext') - yield name, obj - -def map_objects(objs): - if any(o.any('xt') for o in objs): - if len(objs) > 1: - warnings.warn("TODO: Merge NON-directories") - else: - return File(objs[0]) - else: - return Directory(objs) - - -class File(INode): - def __init__(self, obj): - super(File, self).__init__() - self.obj = obj - - def attr(self): - attr = super(File, self).attr() - attr.st_mode |= stat.S_IFREG - attr.st_size = int(self.obj.any(u'filesize', 0)) - return attr - - def is_available(self): - return hasValidStatus(self.obj) - - def ids(self): - return parseHashIds(self.obj['xt']) - -class Symlink(INode): - def __init__(self, obj): - super(Symlink, self).__init__() - self.obj = obj - - def attr(self): - attr = super(Symlink, self).attr() - attr.st_mode |= stat.S_IFLNK - return attr - - def readlink(self): - return (u"/tmp/bhfuse/magnet:?xt=urn:" + self.obj.any('xt')).encode('utf8') - - def is_available(self): - return hasValidStatus(self.obj) - -class Directory(INode): - def __init__(self, objs): - super(Directory, self).__init__() - self.objs = objs - - def attr(self): - attr = super(Directory, self).attr() - attr.st_mode |= stat.S_IFDIR - attr.st_nlink = 2 - attr.st_size = 0 - return attr - - def is_available(self): - return hasValidStatus(self.objs) - - def lookup(self, name): - objs = list() - for obj in self.objs: - objs += DB.query({u'directory': u'%s/%s' % (obj.id, name)}, fields=fields) - - if not objs: - raise(fusell.FUSEError(errno.ENOENT)) - - return map_objects(objs) - - def readdir(self): - children = dict() - for obj in self.objs: - for name, obj in scan(obj): - if not hasValidStatus(obj): - continue - children.setdefault(name, []).append(obj) - - for name, objs in children.iteritems(): - inode = map_objects(objs) - if inode: - yield name, inode - -class Operations(fusell.FUSELL): - def __init__(self, bithorde, mountpoint, options): - self.root = Directory((DB['dir:'],)) - self.inode_open_count = defaultdict(int) - - self.inodes = { - fusell.ROOT_INODE: self.root - } - self.files = {} - - self.bithorde = bithorde - super(Operations, self).__init__(mountpoint, options) - - def _inode_resolve(self, ino, cls=INode): - try: - inode = self.inodes[ino] - assert isinstance(inode, cls) - return inode - except KeyError: - raise(fusell.FUSEError(errno.ENOENT)) - - @timed - def lookup(self, inode_p, name): - inode_p = self._inode_resolve(inode_p, Directory) - inode = inode_p.lookup(name.decode('utf-8')) - self.inodes[inode.ino] = inode - return inode.entry() - - def forget(self, ino, nlookup): - # Assuming the kernel only notifies when nlookup really reaches 0 - try: - del self.inodes[ino] - except: - warnings.warn('Tried to forget something already missing.') - - def getattr(self, inode): - inode = self._inode_resolve(inode) - return inode.attr() - - def opendir(self, inode): - inode = self._inode_resolve(inode, Directory) - return inode.ino - - @timed - def readdir(self, inode, off): - if off: - return - - directory = self._inode_resolve(inode, Directory) - - i = 1 - for name, inode in directory.readdir(): - #self.inodes[inode.ino] = inode - yield name.encode('utf8'), inode.attr(), i - i += 1 - - def releasedir(self, inode): - pass - - @timed - def open(self, inode, flags): - inode = self._inode_resolve(inode, File) - supported_flags = os.O_RDONLY | os.O_LARGEFILE - if (flags & supported_flags) != flags: - raise(fusell.FUSEError(errno.EINVAL)) - fh = fh_pool.get() - asset = self.bithorde.open(inode.ids()) - status = asset.status() - assert status and status.status == message.SUCCESS - self.files[fh] = asset - return fh - - @timed - def read(self, fh, off, size): - try: - f = self.files[fh] - except KeyError: - raise(fusell.FUSEError(errno.EBADF)) - - return f.read(off, size) - - def release(self, fh): - try: - del self.files[fh] - except: - warnings.warn("Trying to release unknown file handle: %s" % fh) - - def readlink(self, inode): - return self._inode_resolve(inode, Symlink).readlink() - - def statfs(self): - stat = fusell.c_statvfs - stat.f_bsize = 64*1024 - stat.f_frsize = 64*1024 - stat.f_blocks = stat.f_bfree = stat.f_bavail = 0 - stat.f_files = stat.f_ffree = stat.f_favail = 0 - return stat - - -def init_logging(): - formatter = logging.Formatter('%(message)s') - handler = logging.StreamHandler() - handler.setFormatter(formatter) - handler.setLevel(logging.DEBUG) - log.setLevel(logging.DEBUG) - log.addHandler(handler) - -if __name__ == '__main__': - init_logging() - try: - self, mountpoint = sys.argv - except: - raise SystemExit('Usage: %s ' % sys.argv[0]) - - bithorde = Client(parseConfig(config.items('BITHORDE')), autoconnect=False) - bithorde.connect() - - mount_point_created = None - if not os.path.exists(mountpoint): - os.mkdir(mountpoint) - mount_point_created = mountpoint - - def cleanup(remove_mountpoint): - if remove_mountpoint: - os.rmdir(remove_mountpoint) - - atexit.register(cleanup, mount_point_created) - - try: - print("Entering llfuse") - fsopts = [ 'fsname=bhindex', 'nonempty', 'debug', 'allow_other', 'max_read=65536', 'ro' ] - operations = Operations(bithorde=bithorde, mountpoint=mountpoint, options=fsopts) - except Exception, e: - log.exception("Error!", exc_info=True) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/fusell.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/fusell.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/fusell.py 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/fusell.py 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,723 @@ +# Copyright (c) 2010 Giorgos Verigakis +# +# Permission to use, copy, modify, and distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import division + +from contextlib import contextmanager +from ctypes import * +from ctypes.util import find_library as _find_library +from errno import * +from math import modf +from platform import machine, system +from stat import S_IFDIR +from os import path +import os + +from concurrent import Pool, trampoline + + +# Tries to locate a shared library, with fallbacks over ctypes.find_library +def find_library(lib): + libpath = _find_library(lib) + if libpath: + return libpath + + for libpath in ('/usr/lib/libfuse.so',): + if path.exists(libpath): + return libpath + + raise IOError("Shared library '%s' not found" % lib) + + +# FUSE Low C-binding declarations +_system = system() +_machine = machine() + +c_void_p_p = POINTER(c_void_p) + + +class c_timespec(Structure): + _fields_ = [('tv_sec', c_long), ('tv_nsec', c_long)] + + def __init__(self, value=None): + super(c_timespec, self).__init__() + if isinstance(value, float): + seconds, decimals = modf(value) + self.tv_sec = c_long(seconds) + self.tv_nsec = c_long(decimals * 1000000000) + + +class c_stat(Structure): + pass # Platform dependent + + +if _system == 'Darwin': + ENOTSUP = 45 + c_dev_t = c_int32 + c_fsblkcnt_t = c_ulong + c_fsfilcnt_t = c_ulong + c_gid_t = c_uint32 + c_mode_t = c_uint16 + c_off_t = c_int64 + c_pid_t = c_int32 + c_uid_t = c_uint32 + c_stat._fields_ = [ + ('st_dev', c_dev_t), + ('st_ino', c_uint32), + ('st_mode', c_mode_t), + ('st_nlink', c_uint16), + ('st_uid', c_uid_t), + ('st_gid', c_gid_t), + ('st_rdev', c_dev_t), + ('st_atimespec', c_timespec), + ('st_mtimespec', c_timespec), + ('st_ctimespec', c_timespec), + ('st_size', c_off_t), + ('st_blocks', c_int64), + ('st_blksize', c_int32)] +elif _system == 'Linux': + ENOTSUP = 95 + c_dev_t = c_ulonglong + c_fsblkcnt_t = c_ulonglong + c_fsfilcnt_t = c_ulonglong + c_gid_t = c_uint + c_mode_t = c_uint + c_off_t = c_longlong + c_pid_t = c_int + c_uid_t = c_uint + + if _machine == 'x86_64': + c_stat._fields_ = [ + ('st_dev', c_dev_t), + ('st_ino', c_ulong), + ('st_nlink', c_ulong), + ('st_mode', c_mode_t), + ('st_uid', c_uid_t), + ('st_gid', c_gid_t), + ('__pad0', c_int), + ('st_rdev', c_dev_t), + ('st_size', c_off_t), + ('st_blksize', c_long), + ('st_blocks', c_long), + ('st_atimespec', c_timespec), + ('st_mtimespec', c_timespec), + ('st_ctimespec', c_timespec)] + elif _machine == 'ppc': + c_stat._fields_ = [ + ('st_dev', c_dev_t), + ('st_ino', c_ulonglong), + ('st_mode', c_mode_t), + ('st_nlink', c_uint), + ('st_uid', c_uid_t), + ('st_gid', c_gid_t), + ('st_rdev', c_dev_t), + ('__pad2', c_ushort), + ('st_size', c_off_t), + ('st_blksize', c_long), + ('st_blocks', c_longlong), + ('st_atimespec', c_timespec), + ('st_mtimespec', c_timespec), + ('st_ctimespec', c_timespec)] + else: + # i686, use as fallback for everything else + c_stat._fields_ = [ + ('st_dev', c_dev_t), + ('__pad1', c_ushort), + ('__st_ino', c_ulong), + ('st_mode', c_mode_t), + ('st_nlink', c_uint), + ('st_uid', c_uid_t), + ('st_gid', c_gid_t), + ('st_rdev', c_dev_t), + ('__pad2', c_ushort), + ('st_size', c_off_t), + ('st_blksize', c_long), + ('st_blocks', c_longlong), + ('st_atimespec', c_timespec), + ('st_mtimespec', c_timespec), + ('st_ctimespec', c_timespec), + ('st_ino', c_ulonglong)] +else: + raise NotImplementedError('%s is not supported.' % _system) + +c_stat_p = POINTER(c_stat) + + +class c_statvfs(Structure): + _fields_ = [ + ('f_bsize', c_ulong), + ('f_frsize', c_ulong), + ('f_blocks', c_fsblkcnt_t), + ('f_bfree', c_fsblkcnt_t), + ('f_bavail', c_fsblkcnt_t), + ('f_files', c_fsfilcnt_t), + ('f_ffree', c_fsfilcnt_t), + ('f_favail', c_fsfilcnt_t)] + + +fuse_ino_t = c_ulong +fuse_req_t = c_void_p +ROOT_INODE = 1 + + +class fuse_args(Structure): + _fields_ = [('argc', c_int), ('argv', POINTER(c_char_p)), ('allocated', c_int)] + + +class fuse_file_info(Structure): + _fields_ = [ + ('flags', c_int), + ('fh_old', c_ulong), + ('writepage', c_int), + ('direct_io', c_uint, 1), + ('keep_cache', c_uint, 1), + ('flush', c_uint, 1), + ('padding', c_uint, 29), + ('fh', c_uint64), + ('lock_owner', c_uint64)] + + +class fuse_ctx(Structure): + _fields_ = [('uid', c_uid_t), ('gid', c_gid_t), ('pid', c_pid_t)] + + +fuse_file_info_p = POINTER(fuse_file_info) +fuse_opt_proc_t = CFUNCTYPE(c_int, c_void_p, c_char_p, c_int, POINTER(fuse_args)) + +FUSE_SET_ATTR = ('st_mode', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime') + + +class fuse_entry_param(Structure): + _fields_ = [ + ('ino', fuse_ino_t), + ('generation', c_ulong), + ('attr', c_stat), + ('attr_timeout', c_double), + ('entry_timeout', c_double)] + + +class fuse_lowlevel_ops(Structure): + _fields_ = [ + ('init', CFUNCTYPE(None, c_void_p, c_void_p)), + ('destroy', CFUNCTYPE(None, c_void_p)), + ('lookup', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p)), + ('forget', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_ulong)), + ('getattr', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_file_info_p)), + ('setattr', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_stat_p, c_int, fuse_file_info_p)), + ('readlink', CFUNCTYPE(None, fuse_req_t, fuse_ino_t)), + ('mknod', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p, c_mode_t, c_dev_t)), + ('mkdir', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p, c_mode_t)), + ('unlink', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p)), + ('rmdir', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p)), + ('symlink', CFUNCTYPE(None, fuse_req_t, c_char_p, fuse_ino_t, c_char_p)), + ('rename', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p, fuse_ino_t, c_char_p)), + ('link', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_ino_t, c_char_p)), + ('open', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_file_info_p)), + ('read', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_size_t, c_off_t, fuse_file_info_p)), + ('write', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_char_p, c_size_t, c_off_t, fuse_file_info_p)), + ('flush', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_file_info_p)), + ('release', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_file_info_p)), + ('fsync', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_int, fuse_file_info_p)), + ('opendir', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_file_info_p)), + ('readdir', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_size_t, c_off_t, fuse_file_info_p)), + ('releasedir', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, fuse_file_info_p)), + ('fsyncdir', CFUNCTYPE(None, fuse_req_t, fuse_ino_t, c_int, fuse_file_info_p))] + + +if _system == 'Darwin': + self.libiconv = CDLL(find_library('iconv'), RTLD_GLOBAL) + +libfuse = CDLL(find_library('fuse')) +# 3d arg should be pointer of type fuse_opt +libfuse.fuse_opt_parse.argtypes = (POINTER(fuse_args), c_void_p, c_void_p, fuse_opt_proc_t) +libfuse.fuse_opt_parse.restype = c_int + +libfuse.fuse_mount.argtypes = (c_char_p, POINTER(fuse_args)) +libfuse.fuse_mount.restype = c_void_p +libfuse.fuse_lowlevel_new.argtypes = (POINTER(fuse_args), POINTER(fuse_lowlevel_ops), + c_size_t, c_void_p) +libfuse.fuse_lowlevel_new.restype = c_void_p +libfuse.fuse_set_signal_handlers.argtypes = (c_void_p,) +libfuse.fuse_session_add_chan.argtypes = (c_void_p, c_void_p) +libfuse.fuse_session_loop.argtypes = (c_void_p,) +libfuse.fuse_remove_signal_handlers.argtypes = (c_void_p,) +libfuse.fuse_session_remove_chan.argtypes = (c_void_p,) +libfuse.fuse_session_destroy.argtypes = (c_void_p,) +libfuse.fuse_unmount.argtypes = (c_char_p, c_void_p) + +libfuse.fuse_chan_fd.argtypes = (c_void_p,) +libfuse.fuse_chan_fd.restype = c_int +libfuse.fuse_chan_recv.argtypes = (c_void_p_p, c_char_p, c_size_t) +libfuse.fuse_chan_recv.restype = c_int +libfuse.fuse_session_process.argtypes = (c_void_p, c_char_p, c_size_t, c_void_p) + +libfuse.fuse_req_ctx.restype = POINTER(fuse_ctx) +libfuse.fuse_req_ctx.argtypes = (fuse_req_t,) + +libfuse.fuse_reply_err.argtypes = (fuse_req_t, c_int) +libfuse.fuse_reply_attr.argtypes = (fuse_req_t, c_void_p, c_double) +libfuse.fuse_reply_entry.argtypes = (fuse_req_t, c_void_p) +libfuse.fuse_reply_open.argtypes = (fuse_req_t, c_void_p) +libfuse.fuse_reply_buf.argtypes = (fuse_req_t, c_char_p, c_size_t) +libfuse.fuse_reply_readlink.argtypes = (fuse_req_t, c_char_p) +libfuse.fuse_reply_write.argtypes = (fuse_req_t, c_size_t) + +libfuse.fuse_add_direntry.argtypes = (c_void_p, c_char_p, c_size_t, c_char_p, + c_stat_p, c_off_t) + + +# Intermediate Binding +def setattr_mask_to_list(mask): + return tuple(FUSE_SET_ATTR[i] for i in range(len(FUSE_SET_ATTR)) if mask & (1 << i)) + + +class FUSEError(Exception): + def __init__(self, errno): + # Call the base class constructor with the parameters it needs + super(FUSEError, self).__init__("FS error: %s" % errorcode.get(errno, errno)) + + self.errno = errno + + +def _copy_value(x): + if hasattr(x, 'contents') and issubclass(x._type_, Structure): + res = x._type_() + pointer(res)[0] = x.contents + return res + else: + return x + + +@contextmanager +def _guard(fn): + yield + fn() + + +def _transient_directory(dir): + existed = path.exists(dir) + if not existed: + os.mkdir(dir) + + +def cleanup(): + if not existed: + os.rmdir(self._dir) + return _guard(cleanup) + + +class FUSELL(object): + def __init__(self, filesystem, mountpoint, fuse_options=[], parallel=8): + self.filesystem = filesystem + self.mountpoint = mountpoint + self.fuse_options = fuse_options + self.parallel = parallel + + args = ['fuse'] + for opt in self.fuse_options: + args += ['-o', opt] + self.argv = fuse_args(len(args), (c_char_p * len(args))(*args), 0) + self.chan = None + + def mount(self): + dir_cleanup = _transient_directory(self.mountpoint) + + def cleanup(): + with dir_cleanup: + libfuse.fuse_unmount(self.mountpoint, self.chan) + self.chan = None + + self.chan = libfuse.fuse_mount(self.mountpoint, self.argv) + return _guard(cleanup) + + def run(self): + assert self.chan + pool = Pool(size=self.parallel) + try: + self._fuse_run_session(pool) + finally: + pool.waitall() + + def _dispatcher(self, pool, method): + def _handler(req, *args): + try: + return method(req, *args) + except FUSEError, e: + return self.reply_err(req, e.errno) + + def _dispatch(req, *args): + # Copy pointer-values in args. They will not be valid later + args = [_copy_value(x) for x in args] + pool.spawn(_handler, req, *args) + return _dispatch + + def _fuse_run_session(self, pool): + fuse_ops = fuse_lowlevel_ops() + + for name, prototype in fuse_lowlevel_ops._fields_: + method = getattr(self, 'fuse_' + name, None) or getattr(self, name, None) + if method: + args = prototype._argtypes_ + if len(args) and args[0] is fuse_req_t: + setattr(fuse_ops, name, prototype(self._dispatcher(pool, method))) + else: + setattr(fuse_ops, name, prototype(method)) + + session = libfuse.fuse_lowlevel_new(self.argv, byref(fuse_ops), sizeof(fuse_ops), None) + assert session + try: + chan = self.chan + libfuse.fuse_session_add_chan(session, chan) + try: + fd = libfuse.fuse_chan_fd(chan) + while True: + trampoline(fd, read=True) + data = create_string_buffer(64 * 1024) + read = libfuse.fuse_chan_recv(c_void_p_p(c_void_p(chan)), data, len(data)) + assert read > 0 + libfuse.fuse_session_process(session, data[0:read], read, chan) + finally: + libfuse.fuse_session_remove_chan(chan) + finally: + libfuse.fuse_session_destroy(session) + + def reply_err(self, req, err): + return libfuse.fuse_reply_err(req, err) + + def reply_none(self, req): + libfuse.fuse_reply_none(req) + + def reply_entry(self, req, entry): + libfuse.fuse_reply_entry(req, byref(entry)) + + def reply_create(self, req, *args): + pass # XXX + + def reply_attr(self, req, attr, attr_timeout): + return libfuse.fuse_reply_attr(req, byref(attr), c_double(attr_timeout)) + + def reply_readlink(self, req, link_contents): + return libfuse.fuse_reply_readlink(req, create_string_buffer(link_contents)) + + def reply_open(self, req, fi): + return libfuse.fuse_reply_open(req, byref(fi)) + + def reply_write(self, req, count): + return libfuse.fuse_reply_write(req, count) + + def reply_buf(self, req, buf): + return libfuse.fuse_reply_buf(req, buf, len(buf or '')) + + def reply_readdir(self, req, size, entries): + off = 0 + buf = create_string_buffer(size) + for name, attr, index in entries: + bufptr = cast(addressof(buf) + off, c_char_p) + bufsize = size - off + entsize = libfuse.fuse_add_direntry(req, bufptr, bufsize, name, byref(attr), index) + if entsize >= bufsize: + break + off += entsize + + if off > 0: + return libfuse.fuse_reply_buf(req, buf, off) + else: + return libfuse.fuse_reply_buf(req, None, 0) + + # If you override the following methods you should reply directly + # with the libfuse.fuse_reply_* methods. + + def fuse_lookup(self, req, parent, name): + entry = self.filesystem.lookup(parent, name) + return self.reply_entry(req, entry) + + def fuse_forget(self, req, ino, nlookup): + self.filesystem.forget(ino, nlookup) + return self.reply_none(req) + + def fuse_getattr(self, req, ino, fi): + attr, timeout = self.filesystem.getattr(ino) + return self.reply_attr(req, attr, timeout) + + def fuse_setattr(self, req, ino, attr, to_set, fi): + to_set_list = setattr_mask_to_list(to_set) + self.filesystem.setattr(req, ino, attr_dict, to_set_list, fi) + + def fuse_open(self, req, ino, fi): + fi.fh = self.filesystem.open(ino, fi.flags) + return self.reply_open(req, fi) + + def fuse_read(self, req, ino, size, off, fi): + res = self.filesystem.read(fi.fh, off, size) + if res is None: + return self.reply_err(req, EIO) + else: + return self.reply_buf(req, res) + + def fuse_write(self, req, ino, buf, size, off, fi): + buf_str = string_at(buf, size) + written = self.filesystem.write(req, ino, buf_str, off, fi) + return self.reply_write(req, written) + + def fuse_release(self, req, ino, fi): + self.filesystem.release(fi.fh) + return self.reply_err(req, 0) + + def fuse_fsync(self, req, ino, datasync, fi): + self.filesystem.fsyncdir(req, ino, datasync, fi) + + def fuse_opendir(self, req, ino, fi): + fi.fh = self.filesystem.opendir(ino) + return self.reply_open(req, fi) + + def fuse_readdir(self, req, ino, size, off, fi): + entries = self.filesystem.readdir(ino, off) + return self.reply_readdir(req, size, entries) + + def fuse_releasedir(self, req, ino, fi): + self.filesystem.releasedir(ino) + return self.reply_err(req, 0) + + def fuse_readlink(self, req, ino): + contents = self.filesystem.readlink(ino) + return self.reply_readlink(req, contents) + + def fuse_mkdir(self, req, parent, name, mode): + entry = self.filesystem.mkdir(parent, name) + return self.reply_entry(req, entry) + + def fuse_rename(self, req, parent, name, newparent, newname): + self.filesystem.rename(parent, name, newparent, newname) + return self.reply_err(req, 0) + + def fuse_fsyncdir(self, req, ino, datasync, fi): + self.filesystem.fsyncdir(req, ino, datasync, fi) + + +class Filesystem: + def init(self, userdata, conn): + """Initialize filesystem + + There's no reply to this method + """ + pass + + def destroy(self, userdata): + """Clean up filesystem + + There's no reply to this method + """ + pass + + def lookup(self, parent, name): + """Look up a directory entry by name and get its attributes. + + Valid replies: + fuse_entry_param() + FUSEError + """ + raise FUSEError(ENOENT) + + def forget(self, req, ino, nlookup): + """Forget about an inode + + Valid replies: + None + """ + pass + + def getattr(self, req, ino, fi): + """Get file attributes + + Valid replies: + c_stat() + FUSEError + """ + if ino == 1: + attr = fusell.c_stat( + st_ino=1, + st_mode=S_IFDIR | 0755, + st_nlink=2, + ) + return attr, 1.0 + else: + raise FUSEError(ENOENT) + + def setattr(self, req, ino, attr, to_set, fi): + """Set file attributes + + Valid replies: + c_stat() + FUSEError + """ + raise FUSEError(ENOSYS) + + def readlink(self, req, ino): + """Read symbolic link + + Valid replies: + str() + FUSEError + """ + raise FUSEError(ENOENT) + + def mknod(self, req, parent, name, mode, rdev): + """Create file node + + Valid replies: + fuse_entry_param() + FUSEError + """ + raise FUSEError(ENOSYS) + + def mkdir(self, req, parent, name, mode): + """Create a directory + + Valid replies: + fuse_entry_param() + FUSEError + """ + raise FUSEError(ENOSYS) + + def unlink(self, req, parent, name): + """Remove a file + + Valid replies: + FUSEError + """ + raise FUSEError(ENOSYS) + + def rmdir(self, req, parent, name): + """Remove a directory + + Valid replies: + FUSEError + """ + raise FUSEError(ENOSYS) + + def symlink(self, req, link, parent, name): + """Create a symbolic link + + Valid replies: + fuse_entry_param() + FUSEError + """ + raise FUSEError(ENOSYS) + + def rename(self, req, parent, name, newparent, newname): + """Rename a file + + Valid replies: + FUSEError + """ + raise FUSEError(ENOSYS) + + def link(self, req, ino, newparent, newname): + """Create a hard link + + Valid replies: + fuse_entry_param() + FUSEError + """ + raise FUSEError(ENOSYS) + + def open(self, req, ino, flags): + """Open a file + + Valid replies: + int(), file handle + FUSEError + """ + raise FUSEError(EIO) + + def read(self, req, ino, size, off, fi): + """Read data + + Valid replies: + str() + FUSEError + """ + raise FUSEError(EIO) + + def write(self, req, ino, buf, off, fi): + """Write data + + Valid replies: + int(), written + FUSEError + """ + raise FUSEError(ENOSYS) + + def flush(self, req, ino, fi): + """Flush method + + Valid replies: + FUSEError + """ + raise FUSEError(0) + + def release(self, req, ino, fi): + """Release an open file + + Valid replies: + FUSEError + """ + raise FUSEError(0) + + def fsync(self, req, ino, datasync, fi): + """Synchronize file contents + + Valid replies: + FUSEError + """ + raise FUSEError(0) + + def opendir(self, req, ino, fi): + """Open a directory + + Valid replies: + int(), file handle + FUSEError + """ + self.reply_open(req, fi) + + def readdir(self, req, ino, size, off, fi): + """Read directory + + Valid replies: + generator of (str() as name, c_stat() as attr, int() as offset) + FUSEError + """ + if ino == 1: + attr = {'st_ino': 1, 'st_mode': S_IFDIR} + entries = [('.', attr), ('..', attr)] + self.reply_readdir(req, size, off, entries) + else: + raise FUSEError(ENOENT) + + def releasedir(self, req, ino, fi): + """Release an open directory + + Valid replies: + FUSEError + """ + raise FUSEError(0) + + def fsyncdir(self, req, ino, datasync, fi): + """Synchronize directory contents + + Valid replies: + FUSEError + """ + raise FUSEError(0) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/fusell_test.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/fusell_test.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/fusell_test.py 1970-01-01 00:00:00.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/fusell_test.py 2017-10-20 13:01:54.000000000 +0000 @@ -0,0 +1,8 @@ +from nose.tools import * + +from fusell import * + + +def test_setattr_mask_to_list(): + assert_equal(setattr_mask_to_list((1 << len(FUSE_SET_ATTR)) - 1), FUSE_SET_ATTR) + assert_equal(setattr_mask_to_list(5), ('st_mode', 'st_gid')) diff -Nru bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/setup.py bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/setup.py --- bhindex-0.7~201708261727+5f343b5~ubuntu16.04.1/setup.py 2017-08-26 17:32:27.000000000 +0000 +++ bhindex-0.7~201710201247+d36d23f~ubuntu16.04.1/setup.py 2017-10-20 13:01:54.000000000 +0000 @@ -65,7 +65,7 @@ # simple. Or you can use find_packages(). packages=find_packages(exclude=['contrib', 'docs', 'tests']), - py_modules=['concurrent'], + py_modules=['concurrent', 'fusell'], # List run-time dependencies here. These will be installed by pip when # your project is installed. For an analysis of "install_requires" vs pip's