diff -Nru apertium-apy-0.1.0~r61159/debian/changelog apertium-apy-0.1.0~r61425/debian/changelog --- apertium-apy-0.1.0~r61159/debian/changelog 2015-07-17 18:36:49.000000000 +0000 +++ apertium-apy-0.1.0~r61425/debian/changelog 2015-08-14 05:59:51.000000000 +0000 @@ -1,3 +1,13 @@ +apertium-apy (0.1.0~r61425-1) unstable; urgency=low + + * New upstream snapshot. + * debian/copyright: + + Added license for toro.py + * debian/control: + + Relaxed dependencies. + + -- Kartik Mistry Fri, 14 Aug 2015 11:29:41 +0530 + apertium-apy (0.1.0~r61159-1) unstable; urgency=low * Initial release (Closes: #769063) diff -Nru apertium-apy-0.1.0~r61159/debian/control apertium-apy-0.1.0~r61425/debian/control --- apertium-apy-0.1.0~r61159/debian/control 2015-07-17 18:36:38.000000000 +0000 +++ apertium-apy-0.1.0~r61425/debian/control 2015-08-14 05:57:38.000000000 +0000 @@ -13,18 +13,16 @@ Package: apertium-apy Architecture: any Depends: adduser, - apertium (>= 3.3), + apertium (>= 3.4), apertium-lex-tools, - build-essential, libxml2-dev, libxslt-dev, logrotate, - lttoolbox (>= 3.3.1), python-toro, python3-dev, python3-lxml, python3-pip, - python3-tornado, + python3-tornado (>= 3.1), zlib1g-dev, ${misc:Depends}, ${python3:Depends}, diff -Nru apertium-apy-0.1.0~r61159/debian/copyright apertium-apy-0.1.0~r61425/debian/copyright --- apertium-apy-0.1.0~r61159/debian/copyright 2015-07-17 18:36:38.000000000 +0000 +++ apertium-apy-0.1.0~r61425/debian/copyright 2015-08-14 05:56:57.000000000 +0000 @@ -21,3 +21,18 @@ . On Debian systems, the complete text of the GNU General Public License version 3 can be found in "/usr/share/common-licenses/GPL-3". + +Files: toro.py +Copyright: 2012, A. Jesse Jiryu Davis +License: Apache-2.0 + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + . + http://www.apache.org/licenses/LICENSE-2.0 + . + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. diff -Nru apertium-apy-0.1.0~r61159/servlet.py apertium-apy-0.1.0~r61425/servlet.py --- apertium-apy-0.1.0~r61159/servlet.py 2015-07-02 12:14:27.000000000 +0000 +++ apertium-apy-0.1.0~r61425/servlet.py 2015-08-13 14:54:09.000000000 +0000 @@ -9,6 +9,7 @@ from functools import wraps from threading import Thread from datetime import datetime +import heapq import tornado, tornado.web, tornado.httpserver, tornado.process, tornado.iostream from tornado import escape, gen @@ -18,8 +19,6 @@ except ImportError: #2.1 from tornado.options import enable_pretty_logging -import toro - from modeSearch import searchPath from util import getLocalizedLanguages, apertium, bilingualTranslate, removeLast, stripTags, processPerWord, getCoverage, getCoverages, toAlpha3Code, toAlpha2Code, noteUnknownToken, scaleMtLog, TranslationInfo, closeDb, flushUnknownWords, inMemoryUnknownToken import translation @@ -57,7 +56,8 @@ analyzers = {} generators = {} taggers = {} - pipelines = {} # (l1, l2): (inpipe, outpipe), only contains flushing pairs! + pipelines = {} # (l1, l2): [translation.Pipeline], only contains flushing pairs! + pipelines_holding = [] callback = None timeout = None scaleMtLogs = False @@ -67,15 +67,15 @@ stats = { 'useCount': {}, - 'lastUsage': {}, 'vmsize': 0, } - # The lock is needed so we don't let two coroutines write - # simultaneously to a pipeline; then the first call to read might - # read translations of text put there by the second call … - pipeline_locks = {} # (l1, l2): lock for (l1, l2) in pairs - pipeline_cmds = {} # (l1, l2): (do_flush, commands) + pipeline_cmds = {} # (l1, l2): translation.ParsedModes + max_pipes_per_pair = 1 + min_pipes_per_pair = 0 + max_users_per_pipe = 5 + max_idle_secs = 0 + restart_pipe_after = 1000 def initialize(self): self.callback = self.get_argument('callback', default=None) @@ -177,7 +177,14 @@ @tornado.web.asynchronous def get(self): self.sendResponse({ - 'responseData': { '%s-%s' % pair: useCount for pair, useCount in self.stats['useCount'].items() }, + 'responseData': { + 'useCount': { '%s-%s' % pair: useCount + for pair, useCount in self.stats['useCount'].items() }, + 'runningPipes': { '%s-%s' % pair: len(pipes) + for pair, pipes in self.pipelines.items() + if pipes != [] }, + 'holdingPipes': len(self.pipelines_holding), + }, 'responseDetails': None, 'responseStatus': 200 }) @@ -190,8 +197,6 @@ class TranslateHandler(BaseHandler): def notePairUsage(self, pair): self.stats['useCount'][pair] = 1 + self.stats['useCount'].get(pair, 0) - if self.max_idle_secs: - self.stats['lastUsage'][pair] = time.time() unknownMarkRE = re.compile(r'\*([^.,;:\t\* ]+)') def maybeStripMarks(self, markUnknown, l1, l2, translated): @@ -209,24 +214,36 @@ else: noteUnknownToken(token, pair, self.missingFreqs) - def shutdownPair(self, pair): - logging.info("shutting down") - self.pipelines[pair][0].stdin.close() - self.pipelines[pair][0].stdout.close() - self.pipelines.pop(pair) + def cleanable(self, i, pair, pipe): + if pipe.useCount > self.restart_pipe_after: + # Not affected by min_pipes_per_pair + logging.info('A pipe for pair %s-%s has handled %d requests, scheduling restart', + pair[0], pair[1], self.restart_pipe_after) + return True + elif (i >= self.min_pipes_per_pair + and self.max_idle_secs != 0 + and time.time() - pipe.lastUsage > self.max_idle_secs): + logging.info("A pipe for pair %s-%s hasn't been used in %d secs, scheduling shutdown", + pair[0], pair[1], self.max_idle_secs) + return True + else: + return False def cleanPairs(self): - if self.max_idle_secs: - for pair, lastUsage in self.stats['lastUsage'].items(): - if pair in self.pipelines and time.time() - lastUsage > self.max_idle_secs: - logging.info('Shutting down pair %s-%s since it has not been used in %d seconds' % ( - pair[0], pair[1], self.max_idle_secs)) - self.shutdownPair(pair) - - def getPipeLock(self, l1, l2): - if (l1, l2) not in self.pipeline_locks: - self.pipeline_locks[(l1, l2)] = toro.Lock() - return self.pipeline_locks[(l1, l2)] + for pair in self.pipelines: + pipes = self.pipelines[pair] + to_clean = set(p for i, p in enumerate(pipes) + if self.cleanable(i, pair, p)) + self.pipelines_holding += to_clean + pipes[:] = [p for p in pipes if not p in to_clean] + heapq.heapify(pipes) + # The holding area lets us restart pipes after n usages next + # time round, since with lots of traffic an active pipe may + # never reach 0 users + self.pipelines_holding[:] = [p for p in self.pipelines_holding + if p.users > 0] + if self.pipelines_holding: + logging.info("%d pipelines still scheduled for shutdown", len(self.pipelines_holding)) def getPipeCmds(self, l1, l2): if (l1, l2) not in self.pipeline_cmds: @@ -234,14 +251,30 @@ self.pipeline_cmds[(l1, l2)] = translation.parseModeFile(mode_path) return self.pipeline_cmds[(l1, l2)] + def shouldStartPipe(self, l1, l2): + pipes = self.pipelines.get((l1, l2), []) + if pipes == []: + logging.info("%s-%s not in pipelines of this process", + l1, l2) + return True + else: + min_p = pipes[0] + if len(pipes) < self.max_pipes_per_pair and min_p.users > self.max_users_per_pipe: + logging.info("%s-%s has ≥%d users per pipe but only %d pipes", + l1, l2, min_p.users, len(pipes)) + return True + else: + return False + def getPipeline(self, l1, l2): - do_flush, commands = self.getPipeCmds(l1, l2) - if not do_flush: - return None - if (l1, l2) not in self.pipelines: - logging.info('%s-%s not in pipelines of this process, starting …' % (l1, l2)) - self.pipelines[(l1, l2)] = translation.startPipeline(commands) - return self.pipelines[(l1, l2)] + pair = (l1, l2) + if self.shouldStartPipe(l1, l2): + logging.info("Starting up a new pipeline for %s-%s …", l1, l2) + if not pair in self.pipelines: + self.pipelines[pair] = [] + p = translation.makePipeline(self.getPipeCmds(l1, l2)) + heapq.heappush(self.pipelines[pair], p) + return self.pipelines[pair][0] def logBeforeTranslation(self): if self.scaleMtLogs: @@ -274,10 +307,9 @@ if '%s-%s' % (l1, l2) in self.pairs: before = self.logBeforeTranslation() - lock = self.getPipeLock(l1, l2) - _, commands = self.getPipeCmds(l1, l2) pipeline = self.getPipeline(l1, l2) - translated = yield translation.translate(toTranslate, lock, pipeline, commands) + self.notePairUsage((l1, l2)) + translated = yield pipeline.translate(toTranslate) self.logAfterTranslation(before, toTranslate) self.sendResponse({ 'responseData': { @@ -286,7 +318,6 @@ 'responseDetails': None, 'responseStatus': 200 }) - self.notePairUsage((l1, l2)) self.cleanPairs() else: self.send_error(400, explanation='That pair is not installed') @@ -641,7 +672,7 @@ missingFreqsDb = '' -def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeout, max_idle_secs, verbosity=0, scaleMtLogs=False, memory=0): +def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeout, max_pipes_per_pair, min_pipes_per_pair, max_users_per_pipe, max_idle_secs, restart_pipe_after, verbosity=0, scaleMtLogs=False, memory=0): global missingFreqsDb missingFreqsDb= missingFreqs @@ -650,7 +681,11 @@ Handler.langNames = langNames Handler.missingFreqs = missingFreqs Handler.timeout = timeout + Handler.max_pipes_per_pair = max_pipes_per_pair + Handler.min_pipes_per_pair = min_pipes_per_pair + Handler.max_users_per_pipe = max_users_per_pipe Handler.max_idle_secs = max_idle_secs + Handler.restart_pipe_after = restart_pipe_after Handler.scaleMtLogs = scaleMtLogs Handler.inMemoryUnknown = True if memory > 0 else False Handler.inMemoryLimit = memory @@ -673,8 +708,17 @@ for dirpath, modename, lang_pair in modes['tagger']: Handler.taggers[lang_pair] = (dirpath, modename) +def sanity_check(): + locale_vars = ["LANG", "LC_ALL"] + u8 = re.compile("UTF-?8", re.IGNORECASE) + if not any(re.search(u8, os.environ.get(key, "")) + for key in locale_vars): + print("servlet.py: error: APY needs a UTF-8 locale, please set LANG or LC_ALL", + file=sys.stderr) + sys.exit(1) if __name__ == '__main__': + sanity_check() parser = argparse.ArgumentParser(description='Start Apertium APY') parser.add_argument('pairs_path', help='path to Apertium installed pairs (all modes files in this path are included)') parser.add_argument('-s', '--nonpairs-path', help='path to Apertium SVN (only non-translator debug modes are included from this path)') @@ -684,10 +728,14 @@ parser.add_argument('-c', '--ssl-cert', help='path to SSL Certificate', default=None) parser.add_argument('-k', '--ssl-key', help='path to SSL Key File', default=None) parser.add_argument('-t', '--timeout', help='timeout for requests (default = 10)', type=int, default=10) - parser.add_argument('-j', '--num-processes', help='number of processes to run (default = number of cores)', type=int, default=0) + parser.add_argument('-j', '--num-processes', help='number of processes to run (default = 1; use 0 to run one http server per core, where each http server runs all available language pairs)', nargs='?', type=int, default=1) parser.add_argument('-d', '--daemon', help='daemon mode: redirects stdout and stderr to files apertium-apy.log and apertium-apy.err ; use with --log-path', action='store_true') parser.add_argument('-P', '--log-path', help='path to log output files to in daemon mode; defaults to local directory', default='./') - parser.add_argument('-m', '--max-idle-secs', help='shut down pipelines it have not been used in this many seconds', type=int, default=0) + parser.add_argument('-i', '--max-pipes-per-pair', help='how many pipelines we can spin up per language pair (default = 1)', type=int, default=1) + parser.add_argument('-n', '--min-pipes-per-pair', help='when shutting down pipelines, keep at least this many open per language pair (default = 0)', type=int, default=0) + parser.add_argument('-u', '--max-users-per-pipe', help='how many concurrent requests per pipeline before we consider spinning up a new one (default = 5)', type=int, default=5) + parser.add_argument('-m', '--max-idle-secs', help='if specified, shut down pipelines that have not been used in this many seconds', type=int, default=0) + parser.add_argument('-r', '--restart-pipe-after', help='restart a pipeline if it has had this many requests (default = 1000)', type=int, default=1000) parser.add_argument('-v', '--verbosity', help='logging verbosity', type=int, default=0) parser.add_argument('-S', '--scalemt-logs', help='generates ScaleMT-like logs; use with --log-path; disables', action='store_true') parser.add_argument('-M', '--unknown-memory-limit', help="keeps unknown words in memory until a limit is reached", type=int, default=0) @@ -718,7 +766,7 @@ if not cld2: logging.warning('Unable to import CLD2, continuing using naive method of language detection') - setupHandler(args.port, args.pairs_path, args.nonpairs_path, args.lang_names, args.missing_freqs, args.timeout, args.max_idle_secs, args.verbosity, args.scalemt_logs, args.unknown_memory_limit) + setupHandler(args.port, args.pairs_path, args.nonpairs_path, args.lang_names, args.missing_freqs, args.timeout, args.max_pipes_per_pair, args.min_pipes_per_pair, args.max_users_per_pipe, args.max_idle_secs, args.restart_pipe_after, args.verbosity, args.scalemt_logs, args.unknown_memory_limit) application = tornado.web.Application([ (r'/', RootHandler), diff -Nru apertium-apy-0.1.0~r61159/tools/apertiumlangs.sql apertium-apy-0.1.0~r61425/tools/apertiumlangs.sql --- apertium-apy-0.1.0~r61159/tools/apertiumlangs.sql 2015-07-02 12:14:27.000000000 +0000 +++ apertium-apy-0.1.0~r61425/tools/apertiumlangs.sql 2015-08-13 14:54:09.000000000 +0000 @@ -2721,7 +2721,7 @@ INSERT INTO "languageNames" VALUES(2735,'en','nl','Dutch'); INSERT INTO "languageNames" VALUES(2736,'en','nn','Norwegian Nynorsk'); INSERT INTO "languageNames" VALUES(2737,'en','no','Norwegian'); -INSERT INTO "languageNames" VALUES(2738,'en','nog','Nogai'); +INSERT INTO "languageNames" VALUES(2738,'en','nog','Nogay'); INSERT INTO "languageNames" VALUES(2739,'en','oc','Occitan'); INSERT INTO "languageNames" VALUES(2740,'en','os','Ossetic'); INSERT INTO "languageNames" VALUES(2741,'en','pa','Punjabi'); @@ -2756,7 +2756,7 @@ INSERT INTO "languageNames" VALUES(2770,'en','tl','Tagalog'); INSERT INTO "languageNames" VALUES(2771,'en','tr','Turkish'); INSERT INTO "languageNames" VALUES(2772,'en','tt','Tatar'); -INSERT INTO "languageNames" VALUES(2773,'en','tyv','Tuvinian'); +INSERT INTO "languageNames" VALUES(2773,'en','tyv','Tuvan'); INSERT INTO "languageNames" VALUES(2774,'en','udm','Udmurt'); INSERT INTO "languageNames" VALUES(2775,'en','uk','Ukrainian'); INSERT INTO "languageNames" VALUES(2776,'en','ur','Urdu'); diff -Nru apertium-apy-0.1.0~r61159/toro.py apertium-apy-0.1.0~r61425/toro.py --- apertium-apy-0.1.0~r61159/toro.py 1970-01-01 00:00:00.000000000 +0000 +++ apertium-apy-0.1.0~r61425/toro.py 2015-08-13 14:54:09.000000000 +0000 @@ -0,0 +1,983 @@ +# From https://github.com/ajdavis/toro/ + +# Toro Copyright (c) 2012 A. Jesse Jiryu Davis + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import contextlib +import heapq +import collections +from functools import partial +from queue import Full, Empty + +from tornado import ioloop +from tornado import gen +from tornado.concurrent import Future + + +version_tuple = (0, 8, '+') + +version = '.'.join(map(str, version_tuple)) +"""Current version of Toro.""" + + +__all__ = [ + # Exceptions + 'NotReady', 'AlreadySet', 'Full', 'Empty', 'Timeout', + + # Primitives + 'AsyncResult', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore', + 'Lock', + + # Queues + 'Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue' +] + + +class NotReady(Exception): + """Raised when accessing an :class:`AsyncResult` that has no value yet.""" + pass + + +class AlreadySet(Exception): + """Raised when setting a value on an :class:`AsyncResult` that already + has one.""" + pass + + +class Timeout(Exception): + """Raised when a deadline passes before a Future is ready.""" + + def __str__(self): + return "Timeout" + + +class _TimeoutFuture(Future): + + def __init__(self, deadline, io_loop): + """Create a Future with optional deadline. + + If deadline is not None, it may be a number denoting a unix timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` object + for a deadline relative to the current time. + + set_exception(toro.Timeout()) is executed after a timeout. + """ + + super(_TimeoutFuture, self).__init__() + self.io_loop = io_loop + if deadline is not None: + callback = partial(self.set_exception, Timeout()) + self._timeout_handle = io_loop.add_timeout(deadline, callback) + else: + self._timeout_handle = None + + def set_result(self, result): + self._cancel_timeout() + super(_TimeoutFuture, self).set_result(result) + + def set_exception(self, exception): + self._cancel_timeout() + super(_TimeoutFuture, self).set_exception(exception) + + def _cancel_timeout(self): + if self._timeout_handle: + self.io_loop.remove_timeout(self._timeout_handle) + self._timeout_handle = None + + +class _ContextManagerList(list): + def __enter__(self, *args, **kwargs): + for obj in self: + obj.__enter__(*args, **kwargs) + + def __exit__(self, *args, **kwargs): + for obj in self: + obj.__exit__(*args, **kwargs) + + +class _ContextManagerFuture(Future): + """A Future that can be used with the "with" statement. + + When a coroutine yields this Future, the return value is a context manager + that can be used like: + + with (yield future): + pass + + At the end of the block, the Future's exit callback is run. Used for + Lock.acquire() and Semaphore.acquire(). + """ + def __init__(self, wrapped, exit_callback): + super(_ContextManagerFuture, self).__init__() + wrapped.add_done_callback(self._done_callback) + self.exit_callback = exit_callback + + def _done_callback(self, wrapped): + if wrapped.exception(): + self.set_exception(wrapped.exception()) + else: + self.set_result(wrapped.result()) + + def result(self): + if self.exception(): + raise self.exception() + + # Otherwise return a context manager that cleans up after the block. + @contextlib.contextmanager + def f(): + try: + yield + finally: + self.exit_callback() + return f() + + +def _consume_expired_waiters(waiters): + # Delete waiters at the head of the queue who've timed out + while waiters and waiters[0].done(): + waiters.popleft() + + +_null_result = object() + + +class AsyncResult(object): + """A one-time event that stores a value or an exception. + + The only distinction between AsyncResult and a simple Future is that + AsyncResult lets coroutines wait with a deadline. The deadline can be + configured separately for each waiter. + + An :class:`AsyncResult` instance cannot be reset. + + :Parameters: + - `io_loop`: Optional custom IOLoop. + """ + + def __init__(self, io_loop=None): + self.io_loop = io_loop or ioloop.IOLoop.current() + self.value = _null_result + self.waiters = [] + + def __str__(self): + result = '<%s ' % (self.__class__.__name__, ) + if self.ready(): + result += 'value=%r' % self.value + else: + result += 'unset' + if self.waiters: + result += ' waiters[%s]' % len(self.waiters) + + return result + '>' + + def set(self, value): + """Set a value and wake up all the waiters.""" + if self.ready(): + raise AlreadySet + + self.value = value + waiters, self.waiters = self.waiters, [] + for waiter in waiters: + if not waiter.done(): # Might have timed out + waiter.set_result(value) + + def ready(self): + return self.value is not _null_result + + def get(self, deadline=None): + """Get a value once :meth:`set` is called. Returns a Future. + + The Future's result will be the value. The Future raises + :exc:`toro.Timeout` if no value is set before the deadline. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for + a deadline relative to the current time. + """ + future = _TimeoutFuture(deadline, self.io_loop) + if self.ready(): + future.set_result(self.value) + else: + self.waiters.append(future) + + return future + + def get_nowait(self): + """Get the value if ready, or raise :class:`NotReady`.""" + if self.ready(): + return self.value + else: + raise NotReady + + +class Condition(object): + """A condition allows one or more coroutines to wait until notified. + + Like a standard Condition_, but does not need an underlying lock that + is acquired and released. + + .. _Condition: http://docs.python.org/library/threading.html#threading.Condition + + :Parameters: + - `io_loop`: Optional custom IOLoop. + """ + + def __init__(self, io_loop=None): + self.io_loop = io_loop or ioloop.IOLoop.current() + self.waiters = collections.deque() # Queue of _Waiter objects + + def __str__(self): + result = '<%s' % (self.__class__.__name__, ) + if self.waiters: + result += ' waiters[%s]' % len(self.waiters) + return result + '>' + + def wait(self, deadline=None): + """Wait for :meth:`notify`. Returns a Future. + + :exc:`~toro.Timeout` is executed after a timeout. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + future = _TimeoutFuture(deadline, self.io_loop) + self.waiters.append(future) + return future + + def notify(self, n=1): + """Wake up `n` waiters. + + :Parameters: + - `n`: The number of waiters to awaken (default: 1) + """ + waiters = [] # Waiters we plan to run right now + while n and self.waiters: + waiter = self.waiters.popleft() + if not waiter.done(): # Might have timed out + n -= 1 + waiters.append(waiter) + + for waiter in waiters: + waiter.set_result(None) + + def notify_all(self): + """Wake up all waiters.""" + self.notify(len(self.waiters)) + + +# TODO: show correct examples that avoid thread / process issues w/ concurrent.futures.Future +class Event(object): + """An event blocks coroutines until its internal flag is set to True. + + Similar to threading.Event_. + + .. _threading.Event: http://docs.python.org/library/threading.html#threading.Event + + .. seealso:: :doc:`examples/event_example` + + :Parameters: + - `io_loop`: Optional custom IOLoop. + """ + + def __init__(self, io_loop=None): + self.io_loop = io_loop or ioloop.IOLoop.current() + self.condition = Condition(io_loop=io_loop) + self._flag = False + + def __str__(self): + return '<%s %s>' % ( + self.__class__.__name__, 'set' if self._flag else 'clear') + + def is_set(self): + """Return ``True`` if and only if the internal flag is true.""" + return self._flag + + def set(self): + """Set the internal flag to ``True``. All waiters are awakened. + Calling :meth:`wait` once the flag is true will not block. + """ + self._flag = True + self.condition.notify_all() + + def clear(self): + """Reset the internal flag to ``False``. Calls to :meth:`wait` + will block until :meth:`set` is called. + """ + self._flag = False + + def wait(self, deadline=None): + """Block until the internal flag is true. Returns a Future. + + The Future raises :exc:`~toro.Timeout` after a timeout. + + :Parameters: + - `callback`: Function taking no arguments. + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + if self._flag: + future = _TimeoutFuture(None, self.io_loop) + future.set_result(None) + return future + else: + return self.condition.wait(deadline) + + +class Queue(object): + """Create a queue object with a given maximum size. + + If `maxsize` is 0 (the default) the queue size is unbounded. + + Unlike the `standard Queue`_, you can reliably know this Queue's size + with :meth:`qsize`, since your single-threaded Tornado application won't + be interrupted between calling :meth:`qsize` and doing an operation on the + Queue. + + **Examples:** + + :doc:`examples/producer_consumer_example` + + :doc:`examples/web_spider_example` + + :Parameters: + - `maxsize`: Optional size limit (no limit by default). + - `io_loop`: Optional custom IOLoop. + + .. _`Gevent's Queue`: http://www.gevent.org/gevent.queue.html + + .. _`standard Queue`: http://docs.python.org/library/queue.html#Queue.Queue + """ + def __init__(self, maxsize=0, io_loop=None): + self.io_loop = io_loop or ioloop.IOLoop.current() + if maxsize is None: + raise TypeError("maxsize can't be None") + + if maxsize < 0: + raise ValueError("maxsize can't be negative") + + self._maxsize = maxsize + + # _TimeoutFutures + self.getters = collections.deque([]) + # Pairs of (item, _TimeoutFuture) + self.putters = collections.deque([]) + self._init(maxsize) + + def _init(self, maxsize): + self.queue = collections.deque() + + def _get(self): + return self.queue.popleft() + + def _put(self, item): + self.queue.append(item) + + def __repr__(self): + return '<%s at %s %s>' % ( + type(self).__name__, hex(id(self)), self._format()) + + def __str__(self): + return '<%s %s>' % (type(self).__name__, self._format()) + + def _format(self): + result = 'maxsize=%r' % (self.maxsize, ) + if getattr(self, 'queue', None): + result += ' queue=%r' % self.queue + if self.getters: + result += ' getters[%s]' % len(self.getters) + if self.putters: + result += ' putters[%s]' % len(self.putters) + return result + + def _consume_expired_putters(self): + # Delete waiters at the head of the queue who've timed out + while self.putters and self.putters[0][1].done(): + self.putters.popleft() + + def qsize(self): + """Number of items in the queue""" + return len(self.queue) + + @property + def maxsize(self): + """Number of items allowed in the queue.""" + return self._maxsize + + def empty(self): + """Return ``True`` if the queue is empty, ``False`` otherwise.""" + return not self.queue + + def full(self): + """Return ``True`` if there are `maxsize` items in the queue. + + .. note:: if the Queue was initialized with `maxsize=0` + (the default), then :meth:`full` is never ``True``. + """ + if self.maxsize == 0: + return False + else: + return self.maxsize <= self.qsize() + + def put(self, item, deadline=None): + """Put an item into the queue. Returns a Future. + + The Future blocks until a free slot is available for `item`, or raises + :exc:`toro.Timeout`. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + _consume_expired_waiters(self.getters) + future = _TimeoutFuture(deadline, self.io_loop) + if self.getters: + assert not self.queue, "queue non-empty, why are getters waiting?" + getter = self.getters.popleft() + + # Use _put and _get instead of passing item straight to getter, in + # case a subclass has logic that must run (e.g. JoinableQueue). + self._put(item) + getter.set_result(self._get()) + future.set_result(None) + else: + if self.maxsize and self.maxsize <= self.qsize(): + self.putters.append((item, future)) + else: + self._put(item) + future.set_result(None) + + return future + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + If no free slot is immediately available, raise queue.Full. + """ + _consume_expired_waiters(self.getters) + if self.getters: + assert not self.queue, "queue non-empty, why are getters waiting?" + getter = self.getters.popleft() + + self._put(item) + getter.set_result(self._get()) + elif self.maxsize and self.maxsize <= self.qsize(): + raise Full + else: + self._put(item) + + def get(self, deadline=None): + """Remove and return an item from the queue. Returns a Future. + + The Future blocks until an item is available, or raises + :exc:`toro.Timeout`. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + self._consume_expired_putters() + future = _TimeoutFuture(deadline, self.io_loop) + if self.putters: + assert self.full(), "queue not full, why are putters waiting?" + item, putter = self.putters.popleft() + self._put(item) + putter.set_result(None) + future.set_result(self._get()) + elif self.qsize(): + future.set_result(self._get()) + else: + self.getters.append(future) + + return future + + def get_nowait(self): + """Remove and return an item from the queue without blocking. + + Return an item if one is immediately available, else raise + :exc:`queue.Empty`. + """ + self._consume_expired_putters() + if self.putters: + assert self.full(), "queue not full, why are putters waiting?" + item, putter = self.putters.popleft() + self._put(item) + putter.set_result(None) + return self._get() + elif self.qsize(): + return self._get() + else: + raise Empty + + +class PriorityQueue(Queue): + """A subclass of :class:`Queue` that retrieves entries in priority order + (lowest first). + + Entries are typically tuples of the form: ``(priority number, data)``. + + :Parameters: + - `maxsize`: Optional size limit (no limit by default). + - `initial`: Optional sequence of initial items. + - `io_loop`: Optional custom IOLoop. + """ + def _init(self, maxsize): + self.queue = [] + + def _put(self, item, heappush=heapq.heappush): + heappush(self.queue, item) + + def _get(self, heappop=heapq.heappop): + return heappop(self.queue) + + +class LifoQueue(Queue): + """A subclass of :class:`Queue` that retrieves most recently added entries + first. + + :Parameters: + - `maxsize`: Optional size limit (no limit by default). + - `initial`: Optional sequence of initial items. + - `io_loop`: Optional custom IOLoop. + """ + def _init(self, maxsize): + self.queue = [] + + def _put(self, item): + self.queue.append(item) + + def _get(self): + return self.queue.pop() + + +class JoinableQueue(Queue): + """A subclass of :class:`Queue` that additionally has :meth:`task_done` + and :meth:`join` methods. + + .. seealso:: :doc:`examples/web_spider_example` + + :Parameters: + - `maxsize`: Optional size limit (no limit by default). + - `initial`: Optional sequence of initial items. + - `io_loop`: Optional custom IOLoop. + """ + def __init__(self, maxsize=0, io_loop=None): + Queue.__init__(self, maxsize=maxsize, io_loop=io_loop) + self.unfinished_tasks = 0 + self._finished = Event(io_loop) + self._finished.set() + + def _format(self): + result = Queue._format(self) + if self.unfinished_tasks: + result += ' tasks=%s' % self.unfinished_tasks + return result + + def _put(self, item): + self.unfinished_tasks += 1 + self._finished.clear() + Queue._put(self, item) + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by queue consumers. For each :meth:`get ` used to + fetch a task, a subsequent call to :meth:`task_done` tells the queue + that the processing on the task is complete. + + If a :meth:`join` is currently blocking, it will resume when all + items have been processed (meaning that a :meth:`task_done` call was + received for every item that had been :meth:`put ` into the + queue). + + Raises ``ValueError`` if called more times than there were items + placed in the queue. + """ + if self.unfinished_tasks <= 0: + raise ValueError('task_done() called too many times') + self.unfinished_tasks -= 1 + if self.unfinished_tasks == 0: + self._finished.set() + + def join(self, deadline=None): + """Block until all items in the queue are processed. Returns a Future. + + The count of unfinished tasks goes up whenever an item is added to + the queue. The count goes down whenever a consumer calls + :meth:`task_done` to indicate that all work on the item is complete. + When the count of unfinished tasks drops to zero, :meth:`join` + unblocks. + + The Future raises :exc:`toro.Timeout` if the count is not zero before + the deadline. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + return self._finished.wait(deadline) + + +class Semaphore(object): + """A lock that can be acquired a fixed number of times before blocking. + + A Semaphore manages a counter representing the number of release() calls + minus the number of acquire() calls, plus an initial value. The acquire() + method blocks if necessary until it can return without making the counter + negative. + + If not given, value defaults to 1. + + :meth:`acquire` supports the context manager protocol: + + >>> from tornado import gen + >>> import toro + >>> semaphore = toro.Semaphore() + >>> + >>> @gen.coroutine + ... def f(): + ... with (yield semaphore.acquire()): + ... assert semaphore.locked() + ... + ... assert not semaphore.locked() + + .. note:: Unlike the standard threading.Semaphore_, a :class:`Semaphore` + can tell you the current value of its :attr:`counter`, because code in a + single-threaded Tornado app can check these values and act upon them + without fear of interruption from another thread. + + .. _threading.Semaphore: http://docs.python.org/library/threading.html#threading.Semaphore + + .. seealso:: :doc:`examples/web_spider_example` + + :Parameters: + - `value`: An int, the initial value (default 1). + - `io_loop`: Optional custom IOLoop. + """ + def __init__(self, value=1, io_loop=None): + if value < 0: + raise ValueError('semaphore initial value must be >= 0') + + # The semaphore is implemented as a Queue with 'value' objects + self.q = Queue(io_loop=io_loop) + for _ in range(value): + self.q.put_nowait(None) + + self._unlocked = Event(io_loop=io_loop) + if value: + self._unlocked.set() + + def __repr__(self): + return '<%s at %s%s>' % ( + type(self).__name__, hex(id(self)), self._format()) + + def __str__(self): + return '<%s%s>' % ( + self.__class__.__name__, self._format()) + + def _format(self): + return ' counter=%s' % self.counter + + @property + def counter(self): + """An integer, the current semaphore value""" + return self.q.qsize() + + def locked(self): + """True if :attr:`counter` is zero""" + return self.q.empty() + + def release(self): + """Increment :attr:`counter` and wake one waiter. + """ + self.q.put(None) + if not self.locked(): + # No one was waiting on acquire(), so self.q.qsize() is positive + self._unlocked.set() + + def wait(self, deadline=None): + """Wait for :attr:`locked` to be False. Returns a Future. + + The Future raises :exc:`toro.Timeout` after the deadline. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + return self._unlocked.wait(deadline) + + def acquire(self, deadline=None): + """Decrement :attr:`counter`. Returns a Future. + + Block if the counter is zero and wait for a :meth:`release`. The + Future raises :exc:`toro.Timeout` after the deadline. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + queue_future = self.q.get(deadline) + if self.q.empty(): + self._unlocked.clear() + future = _ContextManagerFuture(queue_future, self.release) + return future + + def __enter__(self): + raise RuntimeError( + "Use Semaphore like 'with (yield semaphore)', not like" + " 'with semaphore'") + + __exit__ = __enter__ + + +class BoundedSemaphore(Semaphore): + """A semaphore that prevents release() being called too often. + + A bounded semaphore checks to make sure its current value doesn't exceed + its initial value. If it does, ``ValueError`` is raised. In most + situations semaphores are used to guard resources with limited capacity. + If the semaphore is released too many times it's a sign of a bug. + + If not given, *value* defaults to 1. + + .. seealso:: :doc:`examples/web_spider_example` + """ + def __init__(self, value=1, io_loop=None): + super(BoundedSemaphore, self).__init__(value=value, io_loop=io_loop) + self._initial_value = value + + def release(self): + if self.counter >= self._initial_value: + raise ValueError("Semaphore released too many times") + return super(BoundedSemaphore, self).release() + + +class Lock(object): + """A lock for coroutines. + + It is created unlocked. When unlocked, :meth:`acquire` changes the state + to locked. When the state is locked, yielding :meth:`acquire` waits until + a call to :meth:`release`. + + The :meth:`release` method should only be called in the locked state; + an attempt to release an unlocked lock raises RuntimeError. + + When more than one coroutine is waiting for the lock, the first one + registered is awakened by :meth:`release`. + + :meth:`acquire` supports the context manager protocol: + + >>> from tornado import gen + >>> import toro + >>> lock = toro.Lock() + >>> + >>> @gen.coroutine + ... def f(): + ... with (yield lock.acquire()): + ... assert lock.locked() + ... + ... assert not lock.locked() + + .. note:: Unlike with the standard threading.Lock_, code in a + single-threaded Tornado application can check if a :class:`Lock` + is :meth:`locked`, and act on that information without fear that another + thread has grabbed the lock, provided you do not yield to the IOLoop + between checking :meth:`locked` and using a protected resource. + + .. _threading.Lock: http://docs.python.org/2/library/threading.html#lock-objects + + .. seealso:: :doc:`examples/lock_example` + + :Parameters: + - `io_loop`: Optional custom IOLoop. + """ + def __init__(self, io_loop=None): + self._block = BoundedSemaphore(value=1, io_loop=io_loop) + + def __str__(self): + return "<%s _block=%s>" % ( + self.__class__.__name__, + self._block) + + def acquire(self, deadline=None): + """Attempt to lock. Returns a Future. + + The Future raises :exc:`toro.Timeout` if the deadline passes. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a + deadline relative to the current time. + """ + return self._block.acquire(deadline) + + def release(self): + """Unlock. + + If any coroutines are waiting for :meth:`acquire`, + the first in line is awakened. + + If not locked, raise a RuntimeError. + """ + if not self.locked(): + raise RuntimeError('release unlocked lock') + self._block.release() + + def locked(self): + """``True`` if the lock has been acquired""" + return self._block.locked() + + def __enter__(self): + raise RuntimeError( + "Use Lock like 'with (yield lock)', not like" + " 'with lock'") + + __exit__ = __enter__ + + +class RWLock(object): + """A reader-writer lock for coroutines. + + It is created unlocked. When unlocked, :meth:`acquire_write` always changes + the state to locked. When unlocked, :meth:`acquire_read` can changed the + state to locked, if :meth:`acquire_read` was called max_readers times. When + the state is locked, yielding :meth:`acquire_read`/meth:`acquire_write` + waits until a call to :meth:`release_write` in case of locking on write, or + :meth:`release_read` in case of locking on read. + + The :meth:`release_read` method should only be called in the locked-on-read + state; an attempt to release an unlocked lock raises RuntimeError. + + The :meth:`release_write` method should only be called in the locked on + write state; an attempt to release an unlocked lock raises RuntimeError. + + When more than one coroutine is waiting for the lock, the first one + registered is awakened by :meth:`release_read`/:meth:`release_write`. + + :meth:`acquire_read`/:meth:`acquire_write` support the context manager + protocol: + + >>> from tornado import gen + >>> import toro + >>> lock = toro.RWLock(max_readers=10) + >>> + >>> @gen.coroutine + ... def f(): + ... with (yield lock.acquire_read()): + ... assert not lock.locked() + ... + ... with (yield lock.acquire_write()): + ... assert lock.locked() + ... + ... assert not lock.locked() + + .. note:: Unlike with the standard threading.Lock_, code in a + single-threaded Tornado application can check if a :class:`RWLock` + is :meth:`locked`, and act on that information without fear that another + thread has grabbed the lock, provided you do not yield to the IOLoop + between checking :meth:`locked` and using a protected resource. + + .. _threading.Lock: http://docs.python.org/2/library/threading.html#lock-objects + + :Parameters: + - `max_readers`: Optional max readers value, default 1. + - `io_loop`: Optional custom IOLoop. + """ + def __init__(self, max_readers=1, io_loop=None): + self._max_readers = max_readers + self._block = BoundedSemaphore(value=max_readers, io_loop=io_loop) + + def __str__(self): + return "<%s _block=%s>" % ( + self.__class__.__name__, + self._block) + + def acquire_read(self, deadline=None): + """Attempt to lock for read. Returns a Future. + + The Future raises :exc:`toro.Timeout` if the deadline passes. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for + a deadline relative to the current time. + """ + return self._block.acquire(deadline) + + @gen.coroutine + def acquire_write(self, deadline=None): + """Attempt to lock for write. Returns a Future. + + The Future raises :exc:`toro.Timeout` if the deadline passes. + + :Parameters: + - `deadline`: Optional timeout, either an absolute timestamp + (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for + a deadline relative to the current time. + """ + futures = [self._block.acquire(deadline) for _ in + xrange(self._max_readers)] + try: + managers = yield futures + except Timeout: + for f in futures: + # Avoid traceback logging. + f.exception() + raise + + raise gen.Return(_ContextManagerList(managers)) + + def release_read(self): + """Releases one reader. + + If any coroutines are waiting for :meth:`acquire_read` (in case of full + readers queue), the first in line is awakened. + + If not locked, raise a RuntimeError. + """ + if not self.locked(): + raise RuntimeError('release unlocked lock') + self._block.release() + + def release_write(self): + """Releases after write. + + The first in queue will be awakened after release. + + If not locked, raise a RuntimeError. + """ + if not self.locked(): + raise RuntimeError('release unlocked lock') + for i in xrange(self._max_readers): + self._block.release() + + def locked(self): + """``True`` if the lock has been acquired""" + return self._block.locked() + + def __enter__(self): + raise RuntimeError( + "Use RWLock like 'with (yield lock)', not like" + " 'with lock'") + + __exit__ = __enter__ diff -Nru apertium-apy-0.1.0~r61159/translation.py apertium-apy-0.1.0~r61425/translation.py --- apertium-apy-0.1.0~r61159/translation.py 2015-07-02 12:14:27.000000000 +0000 +++ apertium-apy-0.1.0~r61425/translation.py 2015-08-13 14:54:09.000000000 +0000 @@ -2,8 +2,88 @@ from subprocess import Popen, PIPE from tornado import gen import tornado.process, tornado.iostream +try: # >=4.2 + import tornado.locks as locks +except ImportError: + import toro as locks import logging from select import PIPE_BUF +from contextlib import contextmanager +from collections import namedtuple +from time import time + +class Pipeline(object): + def __init__(self): + # The lock is needed so we don't let two coroutines write + # simultaneously to a pipeline; then the first call to read might + # read translations of text put there by the second call … + self.lock = locks.Lock() + # The users count is how many requests have picked this + # pipeline for translation. If this is 0, we can safely shut + # down the pipeline. + self.users = 0 + self.lastUsage = 0 + self.useCount = 0 + + @contextmanager + def use(self): + self.lastUsage = time() + self.users += 1 + try: + yield + finally: + self.users -= 1 + self.lastUsage = time() + self.useCount += 1 + + def __lt__(self, other): + return self.users < other.users + + @gen.coroutine + def translate(self, _): + raise Exception("Not implemented, subclass me!") + +class FlushingPipeline(Pipeline): + def __init__(self, commands, *args, **kwargs): + self.inpipe, self.outpipe = startPipeline(commands) + super().__init__(*args, **kwargs) + + def __del__(self): + logging.debug("shutting down FlushingPipeline that was used %d times", self.useCount) + self.inpipe.stdin.close() + self.inpipe.stdout.close() + # TODO: It seems the process immediately becomes , + # but only completely removed after a second request to the + # server – why? + + @gen.coroutine + def translate(self, toTranslate): + with self.use(): + all_split = splitForTranslation(toTranslate, n_users=self.users) + parts = yield [translateNULFlush(part, self) for part in all_split] + return "".join(parts) + +class SimplePipeline(Pipeline): + def __init__(self, commands, *args, **kwargs): + self.commands = commands + super().__init__(*args, **kwargs) + + @gen.coroutine + def translate(self, toTranslate): + with self.use(): + with (yield self.lock.acquire()): + res = yield translateSimple(toTranslate, self.commands) + return res + + +ParsedModes = namedtuple('ParsedModes', 'do_flush commands') + +def makePipeline(modes_parsed): + if modes_parsed.do_flush: + return FlushingPipeline(modes_parsed.commands) + else: + return SimplePipeline(modes_parsed.commands) + def startPipeline(commands): procs = [] @@ -19,9 +99,9 @@ procs.append(tornado.process.Subprocess(cmd, stdin=in_from, stdout=out_from)) - return procs[0], procs[-1] + def parseModeFile(mode_path): mode_str = open(mode_path, 'r').read().strip() if mode_str: @@ -41,12 +121,12 @@ commands = [] for cmd in mode_str.strip().split('|'): cmd = cmd.replace('$2', '').replace('$1', '-g') - cmd = re.sub('^(\S*)', '\g<1> -z', cmd) + cmd = re.sub(r'^(\S*)', r'\g<1> -z', cmd) commands.append(cmd.split()) - return do_flush, commands + return ParsedModes(do_flush, commands) else: - logging.error('Could not parse mode file %s' % mode_path) - raise Exception('Could not parse mode file %s' % mode_path) + logging.error('Could not parse mode file %s', mode_path) + raise Exception('Could not parse mode file %s', mode_path) def upToBytes(string, max_bytes): @@ -66,7 +146,7 @@ l -= 1 return 0 -def hardbreakFn(string, rush_hour): +def hardbreakFn(string, n_users): """If others are queueing up to translate at the same time, we send short requests, otherwise we try to minimise the number of requests, but without letting buffers fill up. @@ -74,30 +154,35 @@ These numbers could probably be tweaked a lot. """ - if rush_hour: + if n_users > 2: return 1000 else: return upToBytes(string, PIPE_BUF) def preferPunctBreak(string, last, hardbreak): """We would prefer to split on a period or space seen before the - hardbreak, if we can. + hardbreak, if we can. If the remaining string is smaller or equal + than the hardbreak, return end of the string """ + + if(len(string[last:])<= hardbreak): + return last+hardbreak+1 + softbreak = int(hardbreak/2)+1 softnext = last + softbreak hardnext = last + hardbreak dot = string.rfind(".", softnext, hardnext) if dot>-1: - return dot + return dot+1 else: space = string.rfind(" ", softnext, hardnext) if space>-1: - return space + return space+1 else: return hardnext -def splitForTranslation(toTranslate, rush_hour): +def splitForTranslation(toTranslate, n_users): """Splitting it up a bit ensures we don't fill up FIFO buffers (leads to processes hanging on read/write).""" allSplit = [] # [].append and join faster than str += @@ -105,16 +190,18 @@ rounds=0 while last < len(toTranslate) and rounds<10: rounds+=1 - hardbreak = hardbreakFn(toTranslate[last:], rush_hour) + hardbreak = hardbreakFn(toTranslate[last:], n_users) next = preferPunctBreak(toTranslate, last, hardbreak) allSplit.append(toTranslate[last:next]) + #logging.getLogger().setLevel(logging.DEBUG) + logging.debug("splitForTranslation: last:%s hardbreak:%s next:%s appending:%s"%(last,hardbreak,next,toTranslate[last:next])) last = next return allSplit @gen.coroutine -def translateNULFlush(toTranslate, lock, pipeline): - with (yield lock.acquire()): - proc_in, proc_out = pipeline +def translateNULFlush(toTranslate, pipeline): + with (yield pipeline.lock.acquire()): + proc_in, proc_out = pipeline.inpipe, pipeline.outpipe proc_deformat = Popen("apertium-deshtml", stdin=PIPE, stdout=PIPE) proc_deformat.stdin.write(bytes(toTranslate, 'utf-8')) @@ -125,14 +212,14 @@ # TODO: PipeIOStream has no flush, but seems to work anyway? #proc_in.stdin.flush() - output = yield proc_out.stdout.read_until(bytes('\0', 'utf-8')) + output = yield gen.Task(proc_out.stdout.read_until, bytes('\0', 'utf-8')) proc_reformat = Popen("apertium-rehtml-noent", stdin=PIPE, stdout=PIPE) proc_reformat.stdin.write(output) return proc_reformat.communicate()[0].decode('utf-8') -def translateWithoutFlush(toTranslate, lock, pipeline): +def translateWithoutFlush(toTranslate, proc_in, proc_out): proc_deformat = Popen("apertium-deshtml", stdin=PIPE, stdout=PIPE) proc_deformat.stdin.write(bytes(toTranslate, 'utf-8')) deformatted = proc_deformat.communicate()[0] @@ -164,8 +251,8 @@ output.append(toTranslate) output.append(towrite.decode('utf-8')) - pipeline = [] - pipeline.append("apertium-deshtml") + all_cmds = [] + all_cmds.append("apertium-deshtml") for cmd in commands: proc = Popen(cmd, stdin=PIPE, stdout=PIPE) @@ -173,40 +260,29 @@ towrite = proc.communicate()[0] output.append(towrite.decode('utf-8')) - pipeline.append(cmd) + all_cmds.append(cmd) proc_reformat = Popen("apertium-rehtml-noent", stdin=PIPE, stdout=PIPE) proc_reformat.stdin.write(towrite) towrite = proc_reformat.communicate()[0].decode('utf-8') output.append(towrite) - pipeline.append("apertium-rehtml-noent") + all_cmds.append("apertium-rehtml-noent") - return output, pipeline + return output, all_cmds @gen.coroutine def translateSimple(toTranslate, commands): proc_in, proc_out = startPipeline(commands) - assert(proc_in==proc_out) - yield proc_in.stdin.write(bytes(toTranslate, 'utf-8')) + assert proc_in == proc_out + yield gen.Task(proc_in.stdin.write, bytes(toTranslate, 'utf-8')) proc_in.stdin.close() - translated = yield proc_out.stdout.read_until_close() + translated = yield gen.Task(proc_out.stdout.read_until_close) proc_in.stdout.close() return translated.decode('utf-8') -def translateDoc(fileToTranslate, format, modeFile): - modesdir=os.path.dirname(os.path.dirname(modeFile)) - mode=os.path.splitext(os.path.basename(modeFile))[0] - return Popen(['apertium', '-f', format, '-d', modesdir, mode], - stdin=fileToTranslate, stdout=PIPE).communicate()[0] - -@gen.coroutine -def translate(toTranslate, lock, pipeline, commands): - if pipeline: - allSplit = splitForTranslation(toTranslate, rush_hour = lock.locked()) - parts = yield [translateNULFlush(part, lock, pipeline) for part in allSplit] - return "".join(parts) - else: - with (yield lock.acquire()): - res = yield translateSimple(toTranslate, commands) - return res +def translateDoc(fileToTranslate, fmt, modeFile): + modesdir = os.path.dirname(os.path.dirname(modeFile)) + mode = os.path.splitext(os.path.basename(modeFile))[0] + return Popen(['apertium', '-f', fmt, '-d', modesdir, mode], + stdin=fileToTranslate, stdout=PIPE).communicate()[0]