diff -Nru stb-tester-22/CONTRIBUTING.md stb-tester-23-1-gf70a21c/CONTRIBUTING.md --- stb-tester-22/CONTRIBUTING.md 1970-01-01 00:00:00.000000000 +0000 +++ stb-tester-23-1-gf70a21c/CONTRIBUTING.md 2015-07-08 17:05:05.000000000 +0000 @@ -0,0 +1,109 @@ +Contributing to stb-tester +========================== + +Our preferred workflow is via GitHub Pull Requests. + +Feel free to open a pull request if you want to start a discussion even if your +implementation isn't complete (but please make it clear that this is the case; +we like GitHub's [TODO lists] for works in progress). + +Here are a few guidelines to keep in mind when submitting a pull request: + +* Clean commit history: Keep refactorings and functional changes in separate + commits. + +* Commit messages: Short one-line summary, followed by blank line, followed by + as many paragraphs of explanation as needed. Think of the reviewer when + you're writing this: This is the place to clarify any subtleties in your + implementation and to document other approaches you tried that didn't work, + any limitations of your implementation, etc etc. Most importantly describe + *why* you made the change, not just *what* the change is. + +* If your change is visible to users, please add a bullet point in + `docs/release-notes.md` under the next unreleased version. Keep this succint + and think of what a *user* of stb-tester needs to know. + + If you're not very confident in your English, you can skip this step and we + will be happy to write the release note for your change. + +* Ensure that `make check` passes. + + * If you're submitting a change to `stbt camera`, then use + `make check enable_stbt_camera=yes`. + + * We use the [Travis CI] service to automatically run `make check` on all + pull requests. However, Travis uses Ubuntu 12.04 which is missing some + of our dependencies, so it won't run *all* of our self-tests. So it's + still important that you run `make check` yourself. + + If you would like Travis to test your branches on your own fork of + stb-tester before you raise a pull request, follow steps 1 and 2 of the + [Travis set-up instructions], and after you push commits to your fork + you'll see the Travis results in the [GitHub branches view]. + +* New features must be accompanied by self-tests. + + * If your change is a bug-fix, write a regression test if feasible. + + * We write Python unit tests using [nose]: Just add a function named + `test_*` in the appropriate Python file under `tests/`, and use `assert` + to indicate test failure. + + * We write end-to-end tests in bash: See the functions named `test_*` in + `tests/test-*.sh`. + +* If you add new run-time dependencies: + + * The dependencies must be available in the Ubuntu and Fedora repositories + for all [Ubuntu current releases] and [Fedora current releases]. Python + packages that are only available from PyPI or from PPAs aren't allowed. + + * Add the dependencies to the Fedora package in + `extra/fedora/stb-tester.spec.in`. + + * Add the dependencies to the Ubuntu package in `extra/debian/control`. + Note that you may need to list the new dependency under "Build-Depends" + (if it's needed to build stb-tester or to run the self-tests) *and* + under "Depends" (if it's needed at run-time). + + * If you really want to do a thorough job, test the new deb/rpm packages: + + * Fedora: + + * Build the rpm with `make srpm && sudo make rpm` (it needs sudo to + run [yum-builddep]). If you don't have a Fedora host you can use + `extra/fedora/fedora-shell.sh -c "make srpm && sudo make rpm"` + which spins up a Fedora container using docker (it will leave the + built rpm package in the current directory on the host). + + * Then test the rpm by running `extra/fedora/test-rpm.sh `. It will use docker to install the rpm inside a + pristine Fedora container, and run stb-tester's self-tests. Even + if your host system is running Fedora, there is value in testing + the rpm in a docker container to make sure that it doesn't have + undeclared dependencies that you happen to have installed. + + * Debian: + + * Build the package with `make deb`. (TODO: Docker script to build + the deb if your host system isn't Debian/Ubuntu.) + + * Test the deb package by running `make check-ubuntu`. It will use + docker to install the deb package inside a pristine Ubuntu + container, and run stb-tester's self-tests. + + * You'll also need to list the new dependencies in `.travis.yml`, if they + are required by any self-tests (and if they aren't: Why not?). + +Finally, please be patient with us if the review process takes a while. We +really do appreciate your contribution. + + +[TODO lists]: https://github.com/blog/1375%0A-task-lists-in-gfm-issues-pulls-comments +[nose]: https://nose.readthedocs.org/ +[Travis CI]: https://travis-ci.org/ +[Travis set-up instructions]: http://docs.travis-ci.com/user/getting-started/ +[GitHub branches view]: https://github.com/stb-tester/stb-tester/branches +[Ubuntu current releases]: https://wiki.ubuntu.com/Releases#Current +[Fedora current releases]: https://fedoraproject.org/wiki/Releases#Current_Supported_Releases +[yum-builddep]: http://linuxmanpages.net/manpages/fedora21/man1/yum-builddep.1.html diff -Nru stb-tester-22/debian/changelog stb-tester-23-1-gf70a21c/debian/changelog --- stb-tester-22/debian/changelog 2015-03-27 13:42:47.000000000 +0000 +++ stb-tester-23-1-gf70a21c/debian/changelog 2015-07-08 17:07:52.000000000 +0000 @@ -1,5 +1,5 @@ -stb-tester (22-1~utopic) utopic; urgency=medium +stb-tester (23-1-gf70a21c-1~utopic) utopic; urgency=medium * Created from stb-tester git repo http://github.com/stb-tester/stb-tester - -- David Röthlisberger Fri, 27 Mar 2015 13:35:10 +0000 + -- David Röthlisberger Wed, 8 Jul 2015 18:05:05 +0100 diff -Nru stb-tester-22/debian/control stb-tester-23-1-gf70a21c/debian/control --- stb-tester-22/debian/control 2015-03-27 13:42:47.000000000 +0000 +++ stb-tester-23-1-gf70a21c/debian/control 2015-07-08 17:07:52.000000000 +0000 @@ -30,11 +30,15 @@ python-gobject, python-jinja2, python-lxml, + python-lzma, + python-mock, python-nose, python-opencv, + python-pysnmp4, python-scipy, python-serial, python-yaml, + socat, tesseract-ocr, tesseract-ocr-deu, tesseract-ocr-eng, @@ -47,6 +51,7 @@ ${misc:Depends}, curl, gir1.2-gstreamer-1.0, + git, gstreamer1.0-libav, gstreamer1.0-plugins-bad, gstreamer1.0-plugins-base, @@ -63,9 +68,11 @@ python-gobject, python-jinja2, python-lxml, + python-lzma, python-opencv, + python-pysnmp4, python-serial, - snmp, + socat, tesseract-ocr, tesseract-ocr-eng, xdotool diff -Nru stb-tester-22/docs/ocr.md stb-tester-23-1-gf70a21c/docs/ocr.md --- stb-tester-22/docs/ocr.md 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/docs/ocr.md 2015-07-08 17:05:05.000000000 +0000 @@ -13,6 +13,29 @@ characters were misread. This is helpful when reading real words but it can get in the way when reading characters and words with a different structure. +## General tips + +* Crop the region in which you are performing OCR tight to the text using the + `region` parameter to `ocr` and `match_text` + +## Matching some known text + +* Use the `tesseract_user_words` or `tesseract_user_patterns` parameters to + `ocr` and `match_text` to tell the OCR engine what you're expecting. +* Use fuzzy matching to check if the returned text matches what you were + expecting. e.g. a function like: + + def fuzzy_match(string1, string2, threshold=0.8): + import difflib + return difflib.SequenceMatcher(None, string1, string2).ratio() >= threshold + +### Example + +Looking for the text "EastEnders": + + text = stbt.ocr(region=stbt.Region(52, 34, 120, 50)) + assert fuzzy_match(text, "EastEnders) + ## Matching serial numbers For example, here is a code generated at random by one user's set-top box, for diff -Nru stb-tester-22/docs/release-notes.md stb-tester-23-1-gf70a21c/docs/release-notes.md --- stb-tester-22/docs/release-notes.md 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/docs/release-notes.md 2015-07-08 17:05:05.000000000 +0000 @@ -14,6 +14,64 @@ For installation instructions see [Getting Started]( https://github.com/stb-tester/stb-tester/wiki/Getting-started-with-stb-tester). +#### 23 + +New `stbt batch run --shuffle` option to run test cases in a random order. + +8 July 2015. + +##### User-visible changes since 22 + +* `stbt batch run` learned a new option: `--shuffle`. The `--shuffle` option + runs the given test cases in a random order. This can be useful if you have + structured your test pack as a large number of short targeted tests. You can + then use: + + stbt batch run --shuffle \ + epg.py::test1 epg.py::test2 menu.py::test1 menu.py::test2 ... + + to attempt a random walk of different journeys though your set-top-box UI. + This can be particularly effective at finding hard to trigger bugs and get + more value out of the test-cases you have written. + + Some tests may take much longer than other tests, which will then use up a + disproportionate amount of your soaking time. To work around that we measure + how long each test takes the first time it is run, and use that as a weighting + when choosing the next test to run attempting to equalise the time spent in + each test case. + + This makes it reasonable to include both tests that take 10s and tests that + take 10min in the same random soak. + +* There is new structured logging/tracing infrastructure allowing monitoring + what `stbt run` is doing in real-time and saving this data for replay and + analysis later. `stbt run` will write this data to file if it is given the + `--save-trace` command-line option and will write it to the unix domain socket + given by the `STBT_TRACING_SOCKET` environment variable. + + This is used by the stb-tester ONE to display the current status of the + executing test. The tools for replay have not yet been written. For more + information, including the format definition see `_stbt/state_watch.py`. + +* The text drawn by `stbt.draw_text` and `stbt.press` on the recorded video now + fades out over a few seconds. This makes it easier to distinguish the new + messages from the old messages. + +##### Developer-visible changes since 22 + +* Much of the code has moved from `stbt/__init__.py` to `_stbt/core.py`. This + is part of the work in progress to allow `stbt` to be used as a library from + other Python code without having to use `stbt run`. + +* `stbt batch run` has been partially rewritten in Python for better + maintainability and extendability. + +* `stbt batch run` now uses process groups to keep track of its children, rather + than just using the process heirarchy. + +* Support for the ATEN network-controlled power supply has been rewritten in + Python (from bash). (Thanks to Martyn Jarvis and YouView for the patch.) + #### 22 Support for testcases as Python functions and using *assert* in testcases; new @@ -136,7 +194,7 @@ CSS instead of truncating it in the HTML; this allows the full text of the table cell to be searchable from the search box at the top of the report. -* API: `is_frame_black()` now no longer requires a frame to be passed in. If +* API: `is_screen_black()` now no longer requires a frame to be passed in. If one is not specified it will be grabbed from live video, much like `match()`. * `stbt power`: Added support for "Aviosys USB Net Power 8800 Pro" diff -Nru stb-tester-22/extra/debian/control stb-tester-23-1-gf70a21c/extra/debian/control --- stb-tester-22/extra/debian/control 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/extra/debian/control 2015-07-08 17:05:05.000000000 +0000 @@ -30,11 +30,15 @@ python-gobject, python-jinja2, python-lxml, + python-lzma, + python-mock, python-nose, python-opencv, + python-pysnmp4, python-scipy, python-serial, python-yaml, + socat, tesseract-ocr, tesseract-ocr-deu, tesseract-ocr-eng, @@ -47,6 +51,7 @@ ${misc:Depends}, curl, gir1.2-gstreamer-1.0, + git, gstreamer1.0-libav, gstreamer1.0-plugins-bad, gstreamer1.0-plugins-base, @@ -63,9 +68,11 @@ python-gobject, python-jinja2, python-lxml, + python-lzma, python-opencv, + python-pysnmp4, python-serial, - snmp, + socat, tesseract-ocr, tesseract-ocr-eng, xdotool diff -Nru stb-tester-22/extra/fedora/copr-publish.sh stb-tester-23-1-gf70a21c/extra/fedora/copr-publish.sh --- stb-tester-22/extra/fedora/copr-publish.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/extra/fedora/copr-publish.sh 2015-07-08 17:05:05.000000000 +0000 @@ -13,15 +13,14 @@ [[ -n "$src_rpm" ]] && tmpdir=$(mktemp -d --tmpdir stb-tester-copr-publish.XXXXXX) && trap "rm -rf $tmpdir" EXIT && -git clone --depth 1 https://github.com/stb-tester/stb-tester-srpms.git \ +git clone --depth 1 git@github.com:stb-tester/stb-tester-srpms.git \ $tmpdir/stb-tester-srpms && cp $src_rpm $tmpdir/stb-tester-srpms && -cd $tmpdir/stb-tester-srpms && -git add $src_rpm && -git commit -m "$src_rpm" && -git push origin master && +git -C $tmpdir/stb-tester-srpms add $src_rpm && +git -C $tmpdir/stb-tester-srpms commit -m "$src_rpm" && +git -C $tmpdir/stb-tester-srpms push origin master && echo "Published srpm to https://github.com/stb-tester/stb-tester-srpms" && -copr-cli build stb-tester \ - https://github.com/stb-tester/stb-tester-srpms/raw/master/$src_rpm && +"$(dirname "$0")"/fedora-shell.sh -c "copr-cli build stb-tester \ + https://github.com/stb-tester/stb-tester-srpms/raw/master/$src_rpm" && echo "Kicked off copr build" && echo "See http://copr.fedoraproject.org/coprs/stbt/stb-tester/" diff -Nru stb-tester-22/extra/fedora/README.md stb-tester-23-1-gf70a21c/extra/fedora/README.md --- stb-tester-22/extra/fedora/README.md 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/extra/fedora/README.md 2015-07-08 17:05:05.000000000 +0000 @@ -7,5 +7,4 @@ extra/fedora/fedora-shell.sh -c "make srpm; sudo make rpm" extra/fedora/test-rpm.sh stb-tester-$version-1.fc20.x86_64.rpm - extra/fedora/fedora-shell.sh -c \ - "extra/fedora/copr-publish.sh stb-tester-$version-1.fc20.src.rpm" + extra/fedora/copr-publish.sh stb-tester-$version-1.fc20.src.rpm diff -Nru stb-tester-22/extra/fedora/stb-tester.spec.in stb-tester-23-1-gf70a21c/extra/fedora/stb-tester.spec.in --- stb-tester-22/extra/fedora/stb-tester.spec.in 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/extra/fedora/stb-tester.spec.in 2015-07-08 17:05:05.000000000 +0000 @@ -17,18 +17,19 @@ Requires: libvpx Requires: lsof Requires: moreutils -Requires: net-snmp-utils Requires: opencv Requires: opencv-python Requires: openssh-clients Requires: pygobject3 Requires: pylint Requires: pyserial +Requires: pysnmp Requires: python >= 2.7 Requires: python-enum34 Requires: python-flask Requires: python-jinja2 Requires: python-lxml +Requires: socat Requires: tesseract %description diff -Nru stb-tester-22/extra/fedora/test-rpm.sh stb-tester-23-1-gf70a21c/extra/fedora/test-rpm.sh --- stb-tester-22/extra/fedora/test-rpm.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/extra/fedora/test-rpm.sh 2015-07-08 17:05:05.000000000 +0000 @@ -1,7 +1,6 @@ #!/bin/bash -rpm=$1 -[[ -n "$rpm" ]] || { echo "error: rpm file not specified" >&2; exit 1; } +[[ $# -gt 0 ]] || { echo "error: No rpm files specified" >&2; exit 1; } this_dir=$(dirname $0) stbt_dir=$(cd $this_dir/../.. && pwd) @@ -10,12 +9,14 @@ docker rm -f test-stb-tester-fedora-rpm &>/dev/null docker run -t \ --name test-stb-tester-fedora-rpm \ - -v $(pwd)/$rpm:/tmp/$rpm:ro \ + $(for rpm in "$@"; do + echo "-v $PWD/$rpm:/tmp/$rpm:ro" + done | tr '\n' ' ') \ -v $stbt_dir:/usr/src/stb-tester:ro \ fedora:20 \ /bin/bash -c " set -x && - sudo yum install -y /tmp/$rpm && + sudo yum install -y ${*/#/tmp/} && stbt --version && stbt --help && man stbt | cat && diff -Nru stb-tester-22/Makefile stb-tester-23-1-gf70a21c/Makefile --- stb-tester-22/Makefile 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/Makefile 2015-07-08 17:05:05.000000000 +0000 @@ -17,6 +17,12 @@ gstsystempluginsdir=$(shell pkg-config --variable=pluginsdir gstreamer-1.0) gstpluginsdir?=$(if $(filter $(HOME)%,$(prefix)),$(gsthomepluginsdir),$(gstsystempluginsdir)) +# Enable building/installing man page +enable_docs:=$(shell which rst2man >/dev/null 2>&1 && echo yes || echo no) +ifeq ($(enable_docs), no) + $(info Not building/installing documentation because 'rst2man' was not found) +endif + # Enable building/installing stbt camera (smart TV support). enable_stbt_camera?=no @@ -41,7 +47,7 @@ # VERSION file included in the dist tarball otherwise. generate_version := $(shell \ GIT_DIR=.git git describe --always --dirty > VERSION.now 2>/dev/null && \ - sed --in-place "s/^v//g" VERSION.now && \ + perl -pi -e 's/^v//' VERSION.now && \ { cmp VERSION.now VERSION 2>/dev/null || mv VERSION.now VERSION; }; \ rm -f VERSION.now) VERSION?=$(shell cat VERSION) @@ -51,7 +57,7 @@ .DELETE_ON_ERROR: -all: stbt.sh stbt.1 defaults.conf extra/fedora/stb-tester.spec +all: stbt.sh defaults.conf extra/fedora/stb-tester.spec extra/fedora/stb-tester.spec stbt.sh: \ %: %.in .stbt-prefix VERSION @@ -68,7 +74,7 @@ $< > $@ install: install-core -install-core: stbt.sh stbt.1 defaults.conf +install-core: stbt.sh defaults.conf $(INSTALL) -m 0755 -d \ $(DESTDIR)$(bindir) \ $(DESTDIR)$(libexecdir)/stbt \ @@ -77,7 +83,6 @@ $(DESTDIR)$(libexecdir)/stbt/stbt-batch.d \ $(DESTDIR)$(libexecdir)/stbt/stbt-batch.d/static \ $(DESTDIR)$(libexecdir)/stbt/stbt-batch.d/templates \ - $(DESTDIR)$(man1dir) \ $(DESTDIR)$(sysconfdir)/stbt \ $(DESTDIR)$(sysconfdir)/bash_completion.d $(INSTALL) -m 0755 stbt.sh $(DESTDIR)$(bindir)/stbt @@ -87,18 +92,21 @@ _stbt/__init__.py \ _stbt/config.py \ _stbt/control.py \ + _stbt/core.py \ _stbt/gst_hacks.py \ _stbt/irnetbox.py \ _stbt/logging.py \ _stbt/power.py \ _stbt/pylint_plugin.py \ + _stbt/state_watch.py \ _stbt/stbt-power.sh \ _stbt/utils.py \ $(DESTDIR)$(libexecdir)/stbt/_stbt $(INSTALL) -m 0644 stbt/__init__.py $(DESTDIR)$(libexecdir)/stbt/stbt $(INSTALL) -m 0644 defaults.conf $(DESTDIR)$(libexecdir)/stbt/stbt.conf $(INSTALL) -m 0755 \ - stbt-batch.d/run \ + stbt-batch.d/run.py \ + stbt-batch.d/run-one \ stbt-batch.d/report \ stbt-batch.d/instaweb \ $(DESTDIR)$(libexecdir)/stbt/stbt-batch.d @@ -111,7 +119,6 @@ stbt-batch.d/templates/index.html \ stbt-batch.d/templates/testrun.html \ $(DESTDIR)$(libexecdir)/stbt/stbt-batch.d/templates - $(INSTALL) -m 0644 stbt.1 $(DESTDIR)$(man1dir) $(INSTALL) -m 0644 stbt.conf $(DESTDIR)$(sysconfdir)/stbt $(INSTALL) -m 0644 stbt-completion \ $(DESTDIR)$(sysconfdir)/bash_completion.d/stbt @@ -126,15 +133,6 @@ -rmdir $(DESTDIR)$(sysconfdir)/stbt -rmdir $(DESTDIR)$(sysconfdir)/bash_completion.d -doc: stbt.1 - -# Requires python-docutils -stbt.1: README.rst VERSION - sed -e 's/@VERSION@/$(VERSION)/g' $< |\ - sed -e '/\.\. image::/,/^$$/ d' |\ - sed -e 's/(callable_,/(`callable_`,/' |\ - rst2man > $@ - clean: git clean -Xfd || true @@ -228,6 +226,26 @@ TAGS: etags stbt/**.py _stbt/**.py +### Documentation ############################################################ + +doc: stbt.1 + +# Requires python-docutils +stbt.1: README.rst VERSION + sed -e 's/@VERSION@/$(VERSION)/g' $< |\ + sed -e '/\.\. image::/,/^$$/ d' |\ + sed -e 's/(callable_,/(`callable_`,/' |\ + rst2man > $@ + +ifeq ($(enable_docs), yes) + all: stbt.1 + install: install-docs +endif + +install-docs: stbt.1 + $(INSTALL) -m 0755 -d $(DESTDIR)$(man1dir) + $(INSTALL) -m 0644 stbt.1 $(DESTDIR)$(man1dir) + ### Debian Packaging ######################################################### ubuntu_releases ?= trusty utopic diff -Nru stb-tester-22/stbt/__init__.py stb-tester-23-1-gf70a21c/stbt/__init__.py --- stb-tester-22/stbt/__init__.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt/__init__.py 2015-07-08 17:05:05.000000000 +0000 @@ -4,42 +4,34 @@ See `man stbt` and http://stb-tester.com for documentation. Copyright 2012-2013 YouView TV Ltd and contributors. +Copyright 2013-2015 stb-tester.com Ltd. License: LGPL v2.1 or (at your option) any later version (see https://github.com/stb-tester/stb-tester/blob/master/LICENSE for details). """ from __future__ import absolute_import -import argparse -import datetime -import functools -import glob -import inspect -import os -import Queue -import re -import subprocess -import threading -import time -import traceback -import warnings -from collections import deque, namedtuple -from contextlib import contextmanager -from distutils.version import LooseVersion - -import cv2 -import gi -import numpy -from enum import IntEnum -from gi.repository import GLib, GObject, Gst # pylint: disable=E0611 - -from _stbt import config -from _stbt import control -from _stbt import logging -from _stbt import utils -from _stbt.config import ConfigurationError, get_config -from _stbt.gst_hacks import gst_iterate, map_gst_buffer -from _stbt.logging import debug, ddebug, warn +import _stbt.core +from _stbt.core import \ + as_precondition, \ + debug, \ + get_config, \ + ConfigurationError, \ + MatchParameters, \ + MatchResult, \ + MatchTimeout, \ + MotionResult, \ + MotionTimeout, \ + NoVideo, \ + OcrMode, \ + Position, \ + PreconditionError, \ + Region, \ + save_frame, \ + TextMatchResult, \ + UITestError, \ + UITestFailure, \ + wait_until __all__ = [ "as_precondition", @@ -76,17 +68,12 @@ "wait_until", ] -if getattr(gi, "version_info", (0, 0, 0)) < (3, 12, 0): - GObject.threads_init() -Gst.init(None) - -warnings.filterwarnings( - action="always", category=DeprecationWarning, message='.*stb-tester') - +_dut = _stbt.core.DeviceUnderTest() # Functions available to stbt scripts # =========================================================================== + def press(key, interpress_delay_secs=None): """Send the specified key-press to the system under test. @@ -103,25 +90,7 @@ setting ``interpress_delay_secs`` in the ``[press]`` section of stbt.conf. """ - if interpress_delay_secs is None: - interpress_delay_secs = get_config( - "press", "interpress_delay_secs", type_=float) - if getattr(press, 'time_of_last_press', None): - # `sleep` is inside a `while` loop because the actual suspension time - # of `sleep` may be less than that requested. - while True: - seconds_to_wait = ( - press.time_of_last_press - datetime.datetime.now() + - datetime.timedelta(seconds=interpress_delay_secs) - ).total_seconds() - if seconds_to_wait > 0: - time.sleep(seconds_to_wait) - else: - break - - _control.press(key) - press.time_of_last_press = datetime.datetime.now() - draw_text(key, duration_secs=3) + return _dut.press(key, interpress_delay_secs) def draw_text(text, duration_secs=3): @@ -132,415 +101,7 @@ :param duration_secs: The number of seconds to display the text. :type duration_secs: int or float """ - _display.draw(text, duration_secs) - - -class MatchParameters(object): - """Parameters to customise the image processing algorithm used by - `match`, `wait_for_match`, and `press_until_match`. - - You can change the default values for these parameters by setting a key - (with the same name as the corresponding python parameter) in the `[match]` - section of stbt.conf. But we strongly recommend that you don't change the - default values from what is documented here. - - You should only need to change these parameters when you're trying to match - a template image that isn't actually a perfect match -- for example if - there's a translucent background with live TV visible behind it; or if you - have a template image of a button's background and you want it to match even - if the text on the button doesn't match. - - :param str match_method: - The method to be used by the first pass of stb-tester's image matching - algorithm, to find the most likely location of the "template" image - within the larger source image. - - Allowed values are "sqdiff-normed", "ccorr-normed", and "ccoeff-normed". - For the meaning of these parameters, see OpenCV's `cvMatchTemplate - `_. - - We recommend that you don't change this from its default value of - "sqdiff-normed". - - :param float match_threshold: - How strong a result from the first pass must be, to be considered a - match. Valid values range from 0 (anything is considered to match) - to 1 (the match has to be pixel perfect). This defaults to 0.8. - - :param str confirm_method: - The method to be used by the second pass of stb-tester's image matching - algorithm, to confirm that the region identified by the first pass is a - good match. - - The first pass often gives false positives (it reports a "match" for an - image that shouldn't match). The second pass is more CPU-intensive, but - it only checks the position of the image that the first pass identified. - The allowed values are: - - :"none": - Do not confirm the match. Assume that the potential match found is - correct. - - :"absdiff": - Compare the absolute difference of each pixel from the template image - against its counterpart from the candidate region in the source video - frame. - - :"normed-absdiff": - Normalise the pixel values from both the template image and the - candidate region in the source video frame, then compare the absolute - difference as with "absdiff". - - This gives better results with low-contrast images. We recommend setting - this as the default `confirm_method` in stbt.conf, with a - `confirm_threshold` of 0.30. - - :param float confirm_threshold: - The maximum allowed difference between any given pixel from the template - image and its counterpart from the candidate region in the source video - frame, as a fraction of the pixel's total luminance range. - - Valid values range from 0 (more strict) to 1.0 (less strict). - Useful values tend to be around 0.16 for the "absdiff" method, and 0.30 - for the "normed-absdiff" method. - - :param int erode_passes: - After the "absdiff" or "normed-absdiff" absolute difference is taken, - stb-tester runs an erosion algorithm that removes single-pixel differences - to account for noise. Useful values are 1 (the default) and 0 (to disable - this step). - - """ - - def __init__(self, match_method=None, match_threshold=None, - confirm_method=None, confirm_threshold=None, - erode_passes=None): - if match_method is None: - match_method = get_config('match', 'match_method') - if match_threshold is None: - match_threshold = get_config( - 'match', 'match_threshold', type_=float) - if confirm_method is None: - confirm_method = get_config('match', 'confirm_method') - if confirm_threshold is None: - confirm_threshold = get_config( - 'match', 'confirm_threshold', type_=float) - if erode_passes is None: - erode_passes = get_config('match', 'erode_passes', type_=int) - - if match_method not in ( - "sqdiff-normed", "ccorr-normed", "ccoeff-normed"): - raise ValueError("Invalid match_method '%s'" % match_method) - if confirm_method not in ("none", "absdiff", "normed-absdiff"): - raise ValueError("Invalid confirm_method '%s'" % confirm_method) - - self.match_method = match_method - self.match_threshold = match_threshold - self.confirm_method = confirm_method - self.confirm_threshold = confirm_threshold - self.erode_passes = erode_passes - - -class Position(namedtuple('Position', 'x y')): - """A point within the video frame. - - `x` and `y` are integer coordinates (measured in number of pixels) from the - top left corner of the video frame. - """ - pass - - -class Region(namedtuple('Region', 'x y right bottom')): - u""" - ``Region(x, y, width=width, height=height)`` or - ``Region(x, y, right=right, bottom=bottom)`` - - Rectangular region within the video frame. - - For example, given the following regions a, b, and c:: - - - 01234567890123 - 0 ░░░░░░░░ - 1 ░a░░░░░░ - 2 ░░░░░░░░ - 3 ░░░░░░░░ - 4 ░░░░▓▓▓▓░░▓c▓ - 5 ░░░░▓▓▓▓░░▓▓▓ - 6 ░░░░▓▓▓▓░░░░░ - 7 ░░░░▓▓▓▓░░░░░ - 8 ░░░░░░b░░ - 9 ░░░░░░░░░ - - >>> a = Region(0, 0, width=8, height=8) - >>> b = Region(4, 4, right=13, bottom=10) - >>> c = Region(10, 4, width=3, height=2) - >>> a.right - 8 - >>> b.bottom - 10 - >>> b.contains(c), a.contains(b), c.contains(b) - (True, False, False) - >>> b.extend(x=6, bottom=-4) == c - True - >>> a.extend(right=5).contains(c) - True - >>> a.width, a.extend(x=3).width, a.extend(right=-3).width - (8, 5, 5) - >>> print Region.intersect(a, b) - Region(x=4, y=4, width=4, height=4) - >>> Region.intersect(a, b) == Region.intersect(b, a) - True - >>> Region.intersect(c, b) == c - True - >>> print Region.intersect(a, c) - None - >>> print Region.intersect(None, a) - None - >>> quadrant = Region(x=float("-inf"), y=float("-inf"), right=0, bottom=0) - >>> quadrant.translate(2, 2) - Region(x=-inf, y=-inf, right=2, bottom=2) - >>> print c.translate(x=-9, y=-3) - Region(x=1, y=1, width=3, height=2) - >>> Region.intersect(Region.ALL, c) == c - True - >>> Region.ALL - Region.ALL - >>> print Region.ALL - Region.ALL - - .. py:attribute:: x - - The x coordinate of the left edge of the region, measured in pixels - from the left of the video frame (inclusive). - - .. py:attribute:: y - - The y coordinate of the top edge of the region, measured in pixels from - the top of the video frame (inclusive). - - .. py:attribute:: right - - The x coordinate of the right edge of the region, measured in pixels - from the left of the video frame (exclusive). - - .. py:attribute:: bottom - - The y coordinate of the bottom edge of the region, measured in pixels - from the top of the video frame (exclusive). - - ``x``, ``y``, ``right``, and ``bottom`` can be infinite -- that is, - ``float("inf")`` or ``-float("inf")``. - """ - def __new__(cls, x, y, width=None, height=None, right=None, bottom=None): - if (width is None) == (right is None): - raise ValueError("You must specify either 'width' or 'right'") - if (height is None) == (bottom is None): - raise ValueError("You must specify either 'height' or 'bottom'") - if right is None: - right = x + width - if bottom is None: - bottom = y + height - if right <= x: - raise ValueError("'right' must be greater than 'x'") - if bottom <= y: - raise ValueError("'bottom' must be greater than 'y'") - return super(Region, cls).__new__(cls, x, y, right, bottom) - - def __str__(self): - if self == Region.ALL: - return 'Region.ALL' - else: - return 'Region(x=%s, y=%s, width=%s, height=%s)' \ - % (self.x, self.y, self.width, self.height) - - def __repr__(self): - if self == Region.ALL: - return 'Region.ALL' - else: - return super(Region, self).__repr__() - - @property - def width(self): - """The width of the region, measured in pixels.""" - return self.right - self.x - - @property - def height(self): - """The height of the region, measured in pixels.""" - return self.bottom - self.y - - @staticmethod - def from_extents(x, y, right, bottom): - """Create a Region using right and bottom extents rather than width and - height. - - Deprecated since we added ``right`` and ``bottom`` to Region - constructor. - - >>> b = Region.from_extents(4, 4, 13, 10) - >>> print b - Region(x=4, y=4, width=9, height=6) - """ - return Region(x, y, right=right, bottom=bottom) - - @staticmethod - def intersect(a, b): - """ - :returns: The intersection of regions ``a`` and ``b``, or ``None`` if - the regions don't intersect. - - Either ``a`` or ``b`` can be ``None`` so intersect is commutative and - associative. - """ - if a is None or b is None: - return None - else: - extents = (max(a.x, b.x), max(a.y, b.y), - min(a.right, b.right), min(a.bottom, b.bottom)) - if extents[0] < extents[2] and extents[1] < extents[3]: - return Region.from_extents(*extents) - else: - return None - - def contains(self, other): - """:returns: True if ``other`` is entirely contained within self.""" - return (other and self.x <= other.x and self.y <= other.y and - self.right >= other.right and self.bottom >= other.bottom) - - def extend(self, x=0, y=0, right=0, bottom=0): - """ - :returns: A new region with the edges of the region adjusted by the - given amounts. - """ - return Region.from_extents( - self.x + x, self.y + y, self.right + right, self.bottom + bottom) - - def translate(self, x=0, y=0): - """ - :returns: A new region with the position of the region adjusted by the - given amounts. - """ - return Region.from_extents(self.x + x, self.y + y, - self.right + x, self.bottom + y) - -Region.ALL = Region(x=float("-inf"), y=float("-inf"), - right=float("inf"), bottom=float("inf")) - - -def _bounding_box(a, b): - """Find the bounding box of two regions. Returns the smallest region which - contains both regions a and b. - - >>> print _bounding_box(Region(50, 20, 10, 20), Region(20, 30, 10, 20)) - Region(x=20, y=20, width=40, height=30) - >>> print _bounding_box(Region(20, 30, 10, 20), Region(20, 30, 10, 20)) - Region(x=20, y=30, width=10, height=20) - >>> print _bounding_box(None, Region(20, 30, 10, 20)) - Region(x=20, y=30, width=10, height=20) - >>> print _bounding_box(Region(20, 30, 10, 20), None) - Region(x=20, y=30, width=10, height=20) - >>> print _bounding_box(None, None) - None - """ - if a is None: - return b - if b is None: - return a - return Region.from_extents(min(a.x, b.x), min(a.y, b.y), - max(a.right, b.right), max(a.bottom, b.bottom)) - - -class MatchResult(object): - """The result from `match`. - - * ``timestamp``: Video stream timestamp. - * ``match``: Boolean result, the same as evaluating `MatchResult` as a bool. - That is, ``if match_result:`` will behave the same as - ``if match_result.match:``. - * ``region``: The `Region` in the video frame where the image was found. - * ``first_pass_result``: Value between 0 (poor) and 1.0 (excellent match) - from the first pass of stb-tester's two-pass image matching algorithm - (see `MatchParameters` for details). - * ``frame``: The video frame that was searched, in OpenCV format. - * ``image``: The template image that was searched for, as given to `match`. - """ - # pylint: disable=W0621 - def __init__( - self, timestamp, match, region, first_pass_result, frame=None, - image=None): - self.timestamp = timestamp - self.match = match - self.region = region - self.first_pass_result = first_pass_result - if frame is None: - warnings.warn( - "Creating a 'MatchResult' without specifying 'frame' is " - "deprecated. In a future release of stb-tester the 'frame' " - "parameter will be mandatory.", - DeprecationWarning, stacklevel=2) - self.frame = frame - if image is None: - warnings.warn( - "Creating a 'MatchResult' without specifying 'image' is " - "deprecated. In a future release of stb-tester the 'image' " - "parameter will be mandatory.", - DeprecationWarning, stacklevel=2) - image = "" - self.image = image - - def __str__(self): - return ( - "MatchResult(timestamp=%s, match=%s, region=%s, " - "first_pass_result=%s, frame=%s, image=%s)" % ( - self.timestamp, - self.match, - self.region, - self.first_pass_result, - "None" if self.frame is None else "%dx%dx%d" % ( - self.frame.shape[1], self.frame.shape[0], - self.frame.shape[2]), - "" if isinstance(self.image, numpy.ndarray) - else repr(self.image))) - - @property - def position(self): - return Position(self.region.x, self.region.y) - - def __nonzero__(self): - return self.match - - -class _AnnotatedTemplate(namedtuple('_AnnotatedTemplate', - 'image name filename')): - @property - def friendly_name(self): - return self.filename or '' - - -def _load_template(template): - if isinstance(template, _AnnotatedTemplate): - return template - if isinstance(template, numpy.ndarray): - return _AnnotatedTemplate(template, None, None) - else: - template_name = _find_path(template) - if not os.path.isfile(template_name): - raise UITestError("No such template file: %s" % template_name) - image = cv2.imread(template_name, cv2.CV_LOAD_IMAGE_COLOR) - if image is None: - raise UITestError("Failed to load template file: %s" % - template_name) - return _AnnotatedTemplate(image, template, template_name) - - -def _crop(frame, region): - if not _image_region(frame).contains(region): - raise ValueError("'frame' doesn't contain 'region'") - return frame[region.y:region.bottom, region.x:region.right] - - -def _image_region(image): - return Region(0, 0, image.shape[1], image.shape[0]) + return _dut.draw_text(text, duration_secs) def match(image, frame=None, match_parameters=None, region=Region.ALL): @@ -575,37 +136,7 @@ A `MatchResult`, which will evaluate to true if a match was found, false otherwise. """ - if match_parameters is None: - match_parameters = MatchParameters() - - template = _load_template(image) - - grabbed_from_live = (frame is None) - if grabbed_from_live: - frame = _display.get_sample() - - with _numpy_from_sample(frame, readonly=True) as npframe: - region = Region.intersect(_image_region(npframe), region) - - matched, match_region, first_pass_certainty = _match( - _crop(npframe, region), template.image, match_parameters, - template.friendly_name) - - match_region = match_region.translate(region.x, region.y) - result = MatchResult( - _get_frame_timestamp(frame), matched, match_region, - first_pass_certainty, numpy.copy(npframe), - (template.name or template.image)) - - if grabbed_from_live: - _display.draw(result, None) - - if result.match: - debug("Match found: %s" % str(result)) - else: - debug("No match found. Closest match: %s" % str(result)) - - return result + return _dut.match(image, frame, match_parameters, region) def detect_match(image, timeout_secs=10, match_parameters=None): @@ -621,24 +152,7 @@ Specify `match_parameters` to customise the image matching algorithm. See the documentation for `MatchParameters` for details. """ - template = _load_template(image) - - debug("Searching for " + template.friendly_name) - - for sample in _display.gst_samples(timeout_secs): - result = match( - template, frame=sample, match_parameters=match_parameters) - _display.draw(result, None) - yield result - - -class MotionResult(namedtuple('MotionResult', 'timestamp motion')): - """The result from `detect_motion`. - - * `timestamp`: Video stream timestamp. - * `motion`: Boolean result. - """ - pass + return _dut.detect_match(image, timeout_secs, match_parameters) def detect_motion(timeout_secs=10, noise_threshold=None, mask=None): @@ -670,71 +184,7 @@ image to search for motion. White pixels select the area to search; black pixels select the area to ignore. """ - - if noise_threshold is None: - noise_threshold = get_config('motion', 'noise_threshold', type_=float) - - debug("Searching for motion") - - mask_image = None - if mask: - mask_image = _load_mask(mask) - - previous_frame_gray = None - log = functools.partial(_log_image, directory="stbt-debug/detect_motion") - - for sample in _display.gst_samples(timeout_secs): - with _numpy_from_sample(sample, readonly=True) as frame: - frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - log(frame_gray, "source") - - if previous_frame_gray is None: - if (mask_image is not None and - mask_image.shape[:2] != frame_gray.shape[:2]): - raise UITestError( - "The dimensions of the mask '%s' %s don't match the video " - "frame %s" % (mask, mask_image.shape, frame_gray.shape)) - previous_frame_gray = frame_gray - continue - - absdiff = cv2.absdiff(frame_gray, previous_frame_gray) - previous_frame_gray = frame_gray - log(absdiff, "absdiff") - - if mask_image is not None: - absdiff = cv2.bitwise_and(absdiff, mask_image) - log(mask_image, "mask") - log(absdiff, "absdiff_masked") - - _, thresholded = cv2.threshold( - absdiff, int((1 - noise_threshold) * 255), 255, cv2.THRESH_BINARY) - eroded = cv2.erode( - thresholded, - cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))) - log(thresholded, "absdiff_threshold") - log(eroded, "absdiff_threshold_erode") - - motion = (cv2.countNonZero(eroded) > 0) - - # Visualisation: Highlight in red the areas where we detected motion - if motion: - with _numpy_from_sample(sample) as frame: - cv2.add( - frame, - numpy.multiply( - numpy.ones(frame.shape, dtype=numpy.uint8), - (0, 0, 255), # bgr - dtype=numpy.uint8), - mask=cv2.dilate( - thresholded, - cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)), - iterations=1), - dst=frame) - - result = MotionResult(sample.get_buffer().pts, motion) - debug("%s found: %s" % ( - "Motion" if motion else "No motion", str(result))) - yield result + return _dut.detect_motion(timeout_secs, noise_threshold, mask) def wait_for_match(image, timeout_secs=10, consecutive_matches=1, @@ -759,25 +209,8 @@ :returns: `MatchResult` when the image is found. :raises: `MatchTimeout` if no match is found after ``timeout_secs`` seconds. """ - - if match_parameters is None: - match_parameters = MatchParameters() - - match_count = 0 - last_pos = Position(0, 0) - image = _load_template(image) - for res in detect_match( - image, timeout_secs, match_parameters=match_parameters): - if res.match and (match_count == 0 or res.position == last_pos): - match_count += 1 - else: - match_count = 0 - last_pos = res.position - if match_count == consecutive_matches: - debug("Matched " + image.friendly_name) - return res - - raise MatchTimeout(res.frame, image.friendly_name, timeout_secs) # pylint: disable=W0631,C0301 + return _dut.wait_for_match( + image, timeout_secs, consecutive_matches, match_parameters) def press_until_match( @@ -812,28 +245,8 @@ :returns: `MatchResult` when the image is found. :raises: `MatchTimeout` if no match is found after ``timeout_secs`` seconds. """ - if interval_secs is None: - # Should this be float? - interval_secs = get_config( - "press_until_match", "interval_secs", type_=int) - if max_presses is None: - max_presses = get_config("press_until_match", "max_presses", type_=int) - - if match_parameters is None: - match_parameters = MatchParameters() - - i = 0 - - while True: - try: - return wait_for_match(image, timeout_secs=interval_secs, - match_parameters=match_parameters) - except MatchTimeout: - if i < max_presses: - press(key) - i += 1 - else: - raise + return _dut.press_until_match( + key, image, interval_secs, max_presses, match_parameters) def wait_for_motion( @@ -868,232 +281,8 @@ :raises: `MotionTimeout` if no motion is detected after ``timeout_secs`` seconds. """ - - if consecutive_frames is None: - consecutive_frames = get_config('motion', 'consecutive_frames') - - consecutive_frames = str(consecutive_frames) - if '/' in consecutive_frames: - motion_frames = int(consecutive_frames.split('/')[0]) - considered_frames = int(consecutive_frames.split('/')[1]) - else: - motion_frames = int(consecutive_frames) - considered_frames = int(consecutive_frames) - - if motion_frames > considered_frames: - raise ConfigurationError( - "`motion_frames` exceeds `considered_frames`") - - debug("Waiting for %d out of %d frames with motion" % ( - motion_frames, considered_frames)) - - matches = deque(maxlen=considered_frames) - for res in detect_motion(timeout_secs, noise_threshold, mask): - matches.append(res.motion) - if matches.count(True) >= motion_frames: - debug("Motion detected.") - return res - - screenshot = get_frame() - raise MotionTimeout(screenshot, mask, timeout_secs) - - -class OcrMode(IntEnum): - """Options to control layout analysis and assume a certain form of image. - - For a (brief) description of each option, see the `tesseract(1) - `_ - man page. - """ - ORIENTATION_AND_SCRIPT_DETECTION_ONLY = 0 - PAGE_SEGMENTATION_WITH_OSD = 1 - PAGE_SEGMENTATION_WITHOUT_OSD_OR_OCR = 2 - PAGE_SEGMENTATION_WITHOUT_OSD = 3 - SINGLE_COLUMN_OF_TEXT_OF_VARIABLE_SIZES = 4 - SINGLE_UNIFORM_BLOCK_OF_VERTICALLY_ALIGNED_TEXT = 5 - SINGLE_UNIFORM_BLOCK_OF_TEXT = 6 - SINGLE_LINE = 7 - SINGLE_WORD = 8 - SINGLE_WORD_IN_A_CIRCLE = 9 - SINGLE_CHARACTER = 10 - - # For nicer formatting of `ocr` signature in generated API documentation: - def __repr__(self): - return str(self) - - -# Tesseract sometimes has a hard job distinguishing certain glyphs such as -# ligatures and different forms of the same punctuation. We strip out this -# superfluous information improving matching accuracy with minimal effect on -# meaning. This means that stbt.ocr give much more consistent results. -_ocr_replacements = { - # Ligatures - u'ff': u'ff', - u'fi': u'fi', - u'fl': u'fl', - u'ffi': u'ffi', - u'ffl': u'ffl', - u'ſt': u'ft', - u'st': u'st', - # Punctuation - u'“': u'"', - u'”': u'"', - u'‘': u'\'', - u'’': u'\'', - # These are actually different glyphs!: - u'‐': u'-', - u'‑': u'-', - u'‒': u'-', - u'–': u'-', - u'—': u'-', - u'―': u'-', -} -_ocr_transtab = dict((ord(amb), to) for amb, to in _ocr_replacements.items()) - - -def _find_tessdata_dir(): - from distutils.spawn import find_executable - - tessdata_prefix = os.environ.get("TESSDATA_PREFIX", None) - if tessdata_prefix: - tessdata = tessdata_prefix + '/tessdata' - if os.path.exists(tessdata): - return tessdata - else: - raise RuntimeError('Invalid TESSDATA_PREFIX: %s' % tessdata_prefix) - - tess_prefix_share = os.path.normpath( - find_executable('tesseract') + '/../../share/') - for suffix in [ - '/tessdata', '/tesseract-ocr/tessdata', '/tesseract/tessdata']: - if os.path.exists(tess_prefix_share + suffix): - return tess_prefix_share + suffix - raise RuntimeError('Installation error: Cannot locate tessdata directory') - - -def _symlink_copy_dir(a, b): - """Behaves like `cp -rs` with GNU cp but is portable and doesn't require - execing another process. Tesseract requires files in the "tessdata" - directory to be modified to set config options. tessdata may be on a - read-only system directory so we use this to work around that limitation. - """ - from os.path import basename, join, relpath - newroot = join(b, basename(a)) - for dirpath, dirnames, filenames in os.walk(a): - for name in dirnames: - if name not in ['.', '..']: - rel = relpath(join(dirpath, name), a) - os.mkdir(join(newroot, rel)) - for name in filenames: - rel = relpath(join(dirpath, name), a) - os.symlink(join(a, rel), join(newroot, rel)) - -_memoise_tesseract_version = None - - -def _tesseract_version(output=None): - r"""Different versions of tesseract have different bugs. This function - allows us to tell the user if what they want isn't going to work. - - >>> (_tesseract_version('tesseract 3.03\n leptonica-1.70\n') > - ... _tesseract_version('tesseract 3.02\n')) - True - """ - global _memoise_tesseract_version - if output is None: - if _memoise_tesseract_version is None: - _memoise_tesseract_version = subprocess.check_output( - ['tesseract', '--version'], stderr=subprocess.STDOUT) - output = _memoise_tesseract_version - - line = [x for x in output.split('\n') if x.startswith('tesseract')][0] - return LooseVersion(line.split()[1]) - - -def _tesseract(frame, region, mode, lang, _config, - user_patterns=None, user_words=None): - - if _config is None: - _config = {} - - with _numpy_from_sample(frame, readonly=True) as f: - frame_region = Region(0, 0, f.shape[1], f.shape[0]) - intersection = Region.intersect(frame_region, region) - if intersection is None: - warn("Requested OCR in region %s which doesn't overlap with " - "the frame %s" % (str(region), frame_region)) - return ('', None) - else: - region = intersection - - # We scale image up 3x before feeding it to tesseract as this - # significantly reduces the error rate by more than 6x in tests. This - # uses bilinear interpolation which produces the best results. See - # http://stb-tester.com/blog/2014/04/14/improving-ocr-accuracy.html - outsize = (region.width * 3, region.height * 3) - subframe = cv2.resize(_crop(f, region), outsize, - interpolation=cv2.INTER_LINEAR) - - # $XDG_RUNTIME_DIR is likely to be on tmpfs: - tmpdir = os.environ.get("XDG_RUNTIME_DIR", None) - - # The second argument to tesseract is "output base" which is a filename to - # which tesseract will append an extension. Unfortunately this filename - # isn't easy to predict in advance across different versions of tesseract. - # If you give it "hello" the output will be written to "hello.txt", but in - # hOCR mode it will be "hello.html" (tesseract 3.02) or "hello.hocr" - # (tesseract 3.03). We work around this with a temporary directory: - with utils.named_temporary_directory(prefix='stbt-ocr-', dir=tmpdir) as tmp: - outdir = tmp + '/output' - os.mkdir(outdir) - - cmd = ["tesseract", '-l', lang, tmp + '/input.png', - outdir + '/output', "-psm", str(int(mode))] - - tessenv = os.environ.copy() - - if _config or user_words or user_patterns: - tessdata_dir = tmp + '/tessdata' - os.mkdir(tessdata_dir) - _symlink_copy_dir(_find_tessdata_dir(), tmp) - tessenv['TESSDATA_PREFIX'] = tmp + '/' - - if user_words: - if 'user_words_suffix' in _config: - raise ValueError( - "You cannot specify 'user_words' and " + - "'_config[\"user_words_suffix\"]' at the same time") - with open('%s/%s.user-words' % (tessdata_dir, lang), 'w') as f: - f.write('\n'.join(user_words).encode('utf-8')) - _config['user_words_suffix'] = 'user-words' - - if user_patterns: - if 'user_patterns_suffix' in _config: - raise ValueError( - "You cannot specify 'user_patterns' and " + - "'_config[\"user_patterns_suffix\"]' at the same time") - if _tesseract_version() < LooseVersion('3.03'): - raise RuntimeError( - 'tesseract version >=3.03 is required for user_patterns. ' - 'version %s is currently installed' % _tesseract_version()) - with open('%s/%s.user-patterns' % (tessdata_dir, lang), 'w') as f: - f.write('\n'.join(user_patterns).encode('utf-8')) - _config['user_patterns_suffix'] = 'user-patterns' - - if _config: - with open(tessdata_dir + '/configs/stbtester', 'w') as cfg: - for k, v in _config.iteritems(): - if isinstance(v, bool): - cfg.write(('%s %s\n' % (k, 'T' if v else 'F'))) - else: - cfg.write((u"%s %s\n" % (k, unicode(v))) - .encode('utf-8')) - cmd += ['stbtester'] - - cv2.imwrite(tmp + '/input.png', subframe) - subprocess.check_output(cmd, stderr=subprocess.STDOUT, env=tessenv) - with open(outdir + '/' + os.listdir(outdir)[0], 'r') as outfile: - return (outfile.read(), region) + return _dut.wait_for_motion( + timeout_secs, consecutive_frames, noise_threshold, mask) def ocr(frame=None, region=Region.ALL, @@ -1154,99 +343,8 @@ \* * """ - if frame is None: - frame = _display.get_sample() - - if region is None: - warnings.warn( - "Passing region=None to ocr is deprecated since 0.21 and the " - "meaning will change in a future version. To OCR an entire video " - "frame pass region=Region.ALL instead", - DeprecationWarning, stacklevel=2) - region = Region.ALL - - text, region = _tesseract( - frame, region, mode, lang, tesseract_config, - user_patterns=tesseract_user_patterns, user_words=tesseract_user_words) - text = text.decode('utf-8').strip().translate(_ocr_transtab) - debug(u"OCR in region %s read '%s'." % (region, text)) - return text - - -def _hocr_iterate(hocr): - started = False - need_space = False - for elem in hocr.iterdescendants(): - if elem.tag == '{http://www.w3.org/1999/xhtml}p' and started: - yield (u'\n', elem) - need_space = False - if elem.tag == '{http://www.w3.org/1999/xhtml}span' and \ - 'ocr_line' in elem.get('class').split() and started: - yield (u'\n', elem) - need_space = False - for e, t in [(elem, elem.text), (elem.getparent(), elem.tail)]: - if t: - if t.strip(): - if need_space and started: - yield (u' ', None) - need_space = False - yield (unicode(t).strip(), e) - started = True - else: - need_space = True - - -def _hocr_find_phrase(hocr, phrase): - l = list(_hocr_iterate(hocr)) - words_only = [(w, elem) for w, elem in l if w.strip() != u''] - - # Dumb and poor algorithmic complexity but succint and simple - if len(phrase) <= len(words_only): - for x in range(0, len(words_only)): - sublist = words_only[x:x + len(phrase)] - if all(w[0].lower() == p.lower() for w, p in zip(sublist, phrase)): - return sublist - return None - - -def _hocr_elem_region(elem): - while elem is not None: - m = re.search(r'bbox (\d+) (\d+) (\d+) (\d+)', elem.get('title') or u'') - if m: - extents = [int(x) for x in m.groups()] - return Region.from_extents(*extents) - elem = elem.getparent() - - -class TextMatchResult(namedtuple( - "TextMatchResult", "timestamp match region frame text")): - - """The result from `match_text`. - - * ``timestamp``: Video stream timestamp. - * ``match``: Boolean result, the same as evaluating `TextMatchResult` as a - bool. That is, ``if result:`` will behave the same as - ``if result.match:``. - * ``region``: The `Region` (bounding box) of the text found, or ``None`` if - no text was found. - * ``frame``: The video frame that was searched, in OpenCV format. - * ``text``: The text (unicode string) that was searched for, as given to - `match_text`. - """ - # pylint: disable=E1101 - def __nonzero__(self): - return self.match - - def __str__(self): - return ( - "TextMatchResult(timestamp=%s, match=%s, region=%s, frame=%s, " - "text=%s)" % ( - self.timestamp, - self.match, - self.region, - "%dx%dx%d" % (self.frame.shape[1], self.frame.shape[0], - self.frame.shape[2]), - repr(self.text))) + return _dut.ocr(frame, region, mode, lang, tesseract_config, + tesseract_user_words, tesseract_user_patterns) def match_text(text, frame=None, region=Region.ALL, @@ -1277,33 +375,8 @@ stbt.press('KEY_DOWN') """ - import lxml.etree - if frame is None: - frame = get_frame() - - _config = dict(tesseract_config or {}) - _config['tessedit_create_hocr'] = 1 - - ts = _get_frame_timestamp(frame) - - xml, region = _tesseract(frame, region, mode, lang, _config) - if xml == '': - return TextMatchResult(ts, False, None, frame, text) - hocr = lxml.etree.fromstring(xml) - p = _hocr_find_phrase(hocr, text.split()) - if p: - # Find bounding box - box = None - for _, elem in p: - box = _bounding_box(box, _hocr_elem_region(elem)) - # _tesseract crops to region and scales up by a factor of 3 so we must - # undo this transformation here. - box = Region.from_extents( - region.x + box.x // 3, region.y + box.y // 3, - region.x + box.right // 3, region.y + box.bottom // 3) - return TextMatchResult(ts, True, box, frame, text) - else: - return TextMatchResult(ts, False, None, frame, text) + return _dut.match_text( + text, frame, region, mode, lang, tesseract_config) def frames(timeout_secs=None): @@ -1321,22 +394,12 @@ An ``(image, timestamp)`` tuple for each video frame, where ``image`` is a `numpy.ndarray` object (that is, an OpenCV image). """ - return _display.frames(timeout_secs) - - -def save_frame(image, filename): - """Saves an OpenCV image to the specified file. - - Takes an image obtained from `get_frame` or from the `screenshot` - property of `MatchTimeout` or `MotionTimeout`. - """ - cv2.imwrite(filename, image) + return _dut.frames(timeout_secs) def get_frame(): """:returns: The latest video frame in OpenCV format (`numpy.ndarray`).""" - with _numpy_from_sample(_display.get_sample(), readonly=True) as frame: - return frame.copy() + return _dut.get_frame() def is_screen_black(frame=None, mask=None, threshold=None): @@ -1358,1271 +421,18 @@ range 0 (black) to 255 (white). The global default can be changed by setting `threshold` in the `[is_screen_black]` section of stbt.conf. """ - if threshold is None: - threshold = get_config('is_screen_black', 'threshold', type_=int) - if mask: - mask = _load_mask(mask) - if frame is None: - frame = _display.get_sample() - with _numpy_from_sample(frame, readonly=True) as f: - greyframe = cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) - _, greyframe = cv2.threshold(greyframe, threshold, 255, cv2.THRESH_BINARY) - _, maxVal, _, _ = cv2.minMaxLoc(greyframe, mask) - if logging.get_debug_level() > 1: - _log_image(frame, 'source', 'stbt-debug/is_screen_black') - if mask is not None: - _log_image(mask, 'mask', 'stbt-debug/is_screen_black') - _log_image(numpy.bitwise_and(greyframe, mask), - 'non-black-regions-after-masking', - 'stbt-debug/is_screen_black') - else: - _log_image(greyframe, 'non-black-regions-after-masking', - 'stbt-debug/is_screen_black') - return maxVal == 0 - - -def wait_until(callable_, timeout_secs=10, interval_secs=0): - """Wait until a condition becomes true, or until a timeout. - - ``callable_`` is any python callable, such as a function or a lambda - expression. It will be called repeatedly (with a delay of ``interval_secs`` - seconds between successive calls) until it succeeds (that is, it returns a - truthy value) or until ``timeout_secs`` seconds have passed. In both cases, - ``wait_until`` returns the value that ``callable_`` returns. - - After you send a remote-control signal to the system-under-test it usually - takes a few frames to react, so a test script like this would probably - fail:: - - press("KEY_EPG") - assert match("guide.png") - - Instead, use this:: - - press("KEY_EPG") - assert wait_until(lambda: match("guide.png")) - - Note that instead of the above `assert wait_until(...)` you could use - `wait_for_match("guide.png")`. `wait_until` is a generic solution that - also works with stbt's other functions, like `match_text` and - `is_screen_black`. - - `wait_until` allows composing more complex conditions, such as:: - - # Wait until something disappears: - assert wait_until(lambda: not match("xyz.png")) - - # Assert that something doesn't appear within 10 seconds: - assert not wait_until(lambda: match("xyz.png")) - - # Assert that two images are present at the same time: - assert wait_until(lambda: match("a.png") and match("b.png")) - - # Wait but don't raise an exception: - if not wait_until(lambda: match("xyz.png")): - do_something_else() - - There are some drawbacks to using `assert` instead of `wait_for_match`: - - * The exception message won't contain the reason why the match failed - (unless you specify it as a second parameter to `assert`, which is - tedious and we don't expect you to do it), and - * The exception won't have the offending video-frame attached (so the - screenshot in the test-run artifacts will be a few frames later than the - frame that actually caused the test to fail). - - We hope to solve both of the above drawbacks at some point in the future. - """ - expiry_time = time.time() + timeout_secs - while True: - e = callable_() - if e: - return e - if time.time() > expiry_time: - debug("wait_until timed out: %s" % _callable_description(callable_)) - return e - time.sleep(interval_secs) - - -def _callable_description(callable_): - """Helper to provide nicer debug output when `wait_until` fails. - - >>> _callable_description(wait_until) - 'wait_until' - >>> _callable_description( - ... lambda: stbt.press("OK")) - ' lambda: stbt.press("OK"))\\n' - """ - d = callable_.__name__ - if d == "": - try: - d = inspect.getsource(callable_) - except IOError: - pass - return d - - -@contextmanager -def as_precondition(message): - """Context manager that replaces test failures with test errors. - - Stb-tester's reports show test failures (that is, `UITestFailure` or - `AssertionError` exceptions) as red results, and test errors (that is, - unhandled exceptions of any other type) as yellow results. Note that - `wait_for_match`, `wait_for_motion`, and similar functions raise a - `UITestFailure` when they detect a failure. By running such functions - inside an `as_precondition` context, any `UITestFailure` or - `AssertionError` exceptions they raise will be caught, and a - `PreconditionError` will be raised instead. - - When running a single testcase hundreds or thousands of times to reproduce - an intermittent defect, it is helpful to mark unrelated failures as test - errors (yellow) rather than test failures (red), so that you can focus on - diagnosing the failures that are most likely to be the particular defect - you are looking for. For more details see `Test failures vs. errors - `_. - - :param str message: - A description of the precondition. Word this positively: "Channels - tuned", not "Failed to tune channels". - - :raises: - `PreconditionError` if the wrapped code block raises a `UITestFailure` - or `AssertionError`. - - Example:: - - def test_that_the_on_screen_id_is_shown_after_booting(): - channel = 100 - - with stbt.as_precondition("Tuned to channel %s" % channel): - mainmenu.close_any_open_menu() - channels.goto_channel(channel) - power.cold_reboot() - assert channels.is_on_channel(channel) - - stbt.wait_for_match("on-screen-id.png") - - """ - try: - yield - except (UITestFailure, AssertionError) as e: - debug("stbt.as_precondition caught a %s exception and will " - "re-raise it as PreconditionError.\nOriginal exception was:\n%s" - % (type(e).__name__, traceback.format_exc(e))) - exc = PreconditionError(message, e) - if hasattr(e, 'screenshot'): - exc.screenshot = e.screenshot # pylint: disable=W0201 - raise exc - - -class UITestError(Exception): - """The test script had an unrecoverable error.""" - pass - - -class UITestFailure(Exception): - """The test failed because the system under test didn't behave as expected. - - Inherit from this if you need to define your own test-failure exceptions. - """ - pass - - -class NoVideo(UITestFailure): - """No video available from the source pipeline.""" - pass - - -class MatchTimeout(UITestFailure): - """Exception raised by `wait_for_match`. - - * ``screenshot``: The last video frame that `wait_for_match` checked before - timing out. - * ``expected``: Filename of the image that was being searched for. - * ``timeout_secs``: Number of seconds that the image was searched for. - """ - def __init__(self, screenshot, expected, timeout_secs): - super(MatchTimeout, self).__init__() - self.screenshot = screenshot - self.expected = expected - self.timeout_secs = timeout_secs - - def __str__(self): - return "Didn't find match for '%s' within %g seconds." % ( - self.expected, self.timeout_secs) - - -class MotionTimeout(UITestFailure): - """Exception raised by `wait_for_motion`. - - * ``screenshot``: The last video frame that `wait_for_motion` checked before - timing out. - * ``mask``: Filename of the mask that was used, if any. - * ``timeout_secs``: Number of seconds that motion was searched for. - """ - def __init__(self, screenshot, mask, timeout_secs): - super(MotionTimeout, self).__init__() - self.screenshot = screenshot - self.mask = mask - self.timeout_secs = timeout_secs - - def __str__(self): - return "Didn't find motion%s within %g seconds." % ( - " (with mask '%s')" % self.mask if self.mask else "", - self.timeout_secs) - - -class PreconditionError(UITestError): - """Exception raised by `as_precondition`.""" - def __init__(self, message, original_exception): - super(PreconditionError, self).__init__() - self.message = message - self.original_exception = original_exception - - def __str__(self): - return ( - "Didn't meet precondition '%s' (original exception was: %s)" - % (self.message, self.original_exception)) - - -# stbt-run initialisation and convenience functions -# (you will need these if writing your own version of stbt-run) -# =========================================================================== - -def argparser(): - parser = argparse.ArgumentParser() - parser.add_argument( - '--control', - default=get_config('global', 'control'), - help='The remote control to control the stb (default: %(default)s)') - parser.add_argument( - '--source-pipeline', - default=get_config('global', 'source_pipeline'), - help='A gstreamer pipeline to use for A/V input (default: ' - '%(default)s)') - parser.add_argument( - '--sink-pipeline', - default=get_config('global', 'sink_pipeline'), - help='A gstreamer pipeline to use for video output ' - '(default: %(default)s)') - parser.add_argument( - '--restart-source', action='store_true', - default=(get_config('global', 'restart_source').lower() in - ("1", "yes", "true", "on")), - help='Restart the GStreamer source pipeline when video loss is ' - 'detected') - - logging.argparser_add_verbose_argument(parser) - - return parser + return _dut.is_screen_black(frame, mask, threshold) def init_run( gst_source_pipeline, gst_sink_pipeline, control_uri, save_video=False, restart_source=False, transformation_pipeline='identity'): - global _display, _control - _display = Display( - gst_source_pipeline, gst_sink_pipeline, - save_video, restart_source, transformation_pipeline) - _control = control.uri_to_remote(control_uri, _display) + global _dut + _dut = _stbt.core.new_device_under_test_from_config( + gst_source_pipeline, gst_sink_pipeline, control_uri, save_video, + restart_source, transformation_pipeline) + _dut.__enter__() def teardown_run(): - if _display: - _display.teardown() - - -# Internal -# =========================================================================== - -if hasattr(GLib.MainLoop, 'new'): - _mainloop = GLib.MainLoop.new(context=None, is_running=False) -else: - # Ubuntu 12.04 (Travis) support: PyGObject <3.7.2 doesn't expose the "new" - # constructor we'd like to be using, so fall back to __init__. This means - # Ctrl-C is broken on 12.04 and threading will behave differently on Travis - # than on our supported systems. - _mainloop = GLib.MainLoop() - -_display = None -_control = None - - -def _gst_sample_make_writable(sample): - if sample.get_buffer().mini_object.is_writable(): - return sample - else: - return Gst.Sample.new( - sample.get_buffer().copy_region( - Gst.BufferCopyFlags.FLAGS | Gst.BufferCopyFlags.TIMESTAMPS | - Gst.BufferCopyFlags.META | Gst.BufferCopyFlags.MEMORY, 0, - sample.get_buffer().get_size()), - sample.get_caps(), - sample.get_segment(), - sample.get_info()) - - -@contextmanager -def _numpy_from_sample(sample, readonly=False): - """ - Allow the contents of a GstSample to be read (and optionally changed) as a - numpy array. The provided numpy array is a view onto the contents of the - GstBuffer in the sample provided. The data is only valid within the `with:` - block where this contextmanager is used so the provided array should not - be referenced outside the `with:` block. If you want to use it elsewhere - either copy the data with `numpy.ndarray.copy()` or reference the GstSample - directly. - - A `numpy.ndarray` may be passed as sample, in which case this - contextmanager is a no-op. This makes it easier to create functions which - will accept either numpy arrays or GstSamples providing a migration path - for reducing copying in stb-tester. - - :param sample: Either a GstSample or a `numpy.ndarray` containing the data - you wish to manipulate as a `numpy.ndarray` - :param readonly: bool. Determines whether you want to just read or change - the data contained within sample. If True the GstSample - passed must be writeable or ValueError will be raised. - Use `stbt.gst_sample_make_writable` to get a writable - `GstSample`. - - >>> s = Gst.Sample.new(Gst.Buffer.new_wrapped("hello"), - ... Gst.Caps.from_string("video/x-raw"), None, None) - >>> with _numpy_from_sample(s) as a: - ... print a - [104 101 108 108 111] - """ - if isinstance(sample, numpy.ndarray): - yield sample - return - if not isinstance(sample, Gst.Sample): - raise TypeError("numpy_from_gstsample must take a Gst.Sample or a " - "numpy.ndarray. Received a %s" % str(type(sample))) - - caps = sample.get_caps() - flags = Gst.MapFlags.READ - if not readonly: - flags |= Gst.MapFlags.WRITE - - with map_gst_buffer(sample.get_buffer(), flags) as buf: - array = numpy.frombuffer((buf), dtype=numpy.uint8) - array.flags.writeable = not readonly - if caps.get_structure(0).get_value('format') in ['BGR', 'RGB']: - array.shape = (caps.get_structure(0).get_value('height'), - caps.get_structure(0).get_value('width'), - 3) - yield array - - -def _test_that_mapping_a_sample_readonly_gives_a_readonly_array(): - Gst.init([]) - s = Gst.Sample.new(Gst.Buffer.new_wrapped("hello"), - Gst.Caps.from_string("video/x-raw"), None, None) - with _numpy_from_sample(s, readonly=True) as ro: - try: - ro[0] = 3 - assert False, 'Writing elements should have thrown' - except (ValueError, RuntimeError): - # Different versions of numpy raise different exceptions - pass - - -def _test_passing_a_numpy_ndarray_as_sample_is_a_noop(): - a = numpy.ndarray((5, 2)) - with _numpy_from_sample(a) as m: - assert a is m - - -def _test_that_dimensions_of_array_are_according_to_caps(): - s = Gst.Sample.new(Gst.Buffer.new_wrapped( - "row 1 4 px row 2 4 px row 3 4 px "), - Gst.Caps.from_string("video/x-raw,format=BGR,width=4,height=3"), - None, None) - with _numpy_from_sample(s, readonly=True) as a: - assert a.shape == (3, 4, 3) - - -def _get_frame_timestamp(frame): - if isinstance(frame, Gst.Sample): - return frame.get_buffer().pts - else: - return None - - -class Display(object): - def __init__(self, user_source_pipeline, user_sink_pipeline, - save_video, - restart_source=False, - transformation_pipeline='identity'): - self.novideo = False - self.lock = threading.RLock() # Held by whoever is consuming frames - self.last_sample = Queue.Queue(maxsize=1) - self.source_pipeline = None - self.start_timestamp = None - self.underrun_timeout = None - self.text_annotations = [] - self.match_annotations = [] - self.tearing_down = False - - self.restart_source_enabled = restart_source - - appsink = ( - "appsink name=appsink max-buffers=1 drop=false sync=true " - "emit-signals=true " - "caps=video/x-raw,format=BGR") - # Notes on the source pipeline: - # * _stbt_raw_frames_queue is kept small to reduce the amount of slack - # (and thus the latency) of the pipeline. - # * _stbt_user_data_queue before the decodebin is large. We don't want - # to drop encoded packets as this will cause significant image - # artifacts in the decoded buffers. We make the assumption that we - # have enough horse-power to decode the incoming stream and any delays - # will be transient otherwise it could start filling up causing - # increased latency. - self.source_pipeline_description = " ! ".join([ - user_source_pipeline, - 'queue name=_stbt_user_data_queue max-size-buffers=0 ' - ' max-size-bytes=0 max-size-time=10000000000', - "decodebin", - 'queue name=_stbt_raw_frames_queue max-size-buffers=2', - 'videoconvert', - 'video/x-raw,format=BGR', - transformation_pipeline, - appsink]) - self.create_source_pipeline() - - if save_video: - if not save_video.endswith(".webm"): - save_video += ".webm" - debug("Saving video to '%s'" % save_video) - video_pipeline = ( - "t. ! queue leaky=downstream ! videoconvert ! " - "vp8enc cpu-used=6 min_quantizer=32 max_quantizer=32 ! " - "webmmux ! filesink location=%s" % save_video) - else: - video_pipeline = "" - - sink_pipeline_description = " ".join([ - "appsrc name=appsrc format=time " + - "caps=video/x-raw,format=(string)BGR !", - "tee name=t", - video_pipeline, - "t. ! queue leaky=downstream ! videoconvert !", - user_sink_pipeline - ]) - - self.sink_pipeline = Gst.parse_launch(sink_pipeline_description) - sink_bus = self.sink_pipeline.get_bus() - sink_bus.connect( - "message::error", - lambda bus, msg: self.on_error(self.sink_pipeline, bus, msg)) - sink_bus.connect("message::warning", self.on_warning) - sink_bus.connect("message::eos", self.on_eos_from_sink_pipeline) - sink_bus.add_signal_watch() - self.appsrc = self.sink_pipeline.get_by_name("appsrc") - - debug("source pipeline: %s" % self.source_pipeline_description) - debug("sink pipeline: %s" % sink_pipeline_description) - - self.source_pipeline.set_state(Gst.State.PLAYING) - self.sink_pipeline.set_state(Gst.State.PLAYING) - - self.mainloop_thread = threading.Thread(target=_mainloop.run) - self.mainloop_thread.daemon = True - self.mainloop_thread.start() - - def create_source_pipeline(self): - self.source_pipeline = Gst.parse_launch( - self.source_pipeline_description) - source_bus = self.source_pipeline.get_bus() - source_bus.connect( - "message::error", - lambda bus, msg: self.on_error(self.source_pipeline, bus, msg)) - source_bus.connect("message::warning", self.on_warning) - source_bus.connect("message::eos", self.on_eos_from_source_pipeline) - source_bus.add_signal_watch() - appsink = self.source_pipeline.get_by_name("appsink") - appsink.connect("new-sample", self.on_new_sample) - - if self.restart_source_enabled: - # Handle loss of video (but without end-of-stream event) from the - # Hauppauge HDPVR capture device. - source_queue = self.source_pipeline.get_by_name( - "_stbt_user_data_queue") - self.start_timestamp = None - source_queue.connect("underrun", self.on_underrun) - source_queue.connect("running", self.on_running) - - if (self.source_pipeline.set_state(Gst.State.PAUSED) - == Gst.StateChangeReturn.NO_PREROLL): - # This is a live source, drop frames if we get behind - self.source_pipeline.get_by_name('_stbt_raw_frames_queue') \ - .set_property('leaky', 'downstream') - self.source_pipeline.get_by_name('appsink') \ - .set_property('sync', False) - - def get_sample(self, timeout_secs=10): - try: - # Timeout in case no frames are received. This happens when the - # Hauppauge HDPVR video-capture device loses video. - gst_sample = self.last_sample.get(timeout=timeout_secs) - self.novideo = False - except Queue.Empty: - self.novideo = True - pipeline = self.source_pipeline - if pipeline: - Gst.debug_bin_to_dot_file_with_ts( - pipeline, Gst.DebugGraphDetails.ALL, "NoVideo") - raise NoVideo("No video") - if isinstance(gst_sample, Exception): - raise UITestError(str(gst_sample)) - - return gst_sample - - def frames(self, timeout_secs=None): - for sample in self.gst_samples(timeout_secs=timeout_secs): - with _numpy_from_sample(sample, readonly=True) as frame: - copy = frame.copy() - yield (copy, sample.get_buffer().pts) - - def gst_samples(self, timeout_secs=None): - self.start_timestamp = None - - with self.lock: - while True: - ddebug("user thread: Getting sample at %s" % time.time()) - sample = self.get_sample(max(10, timeout_secs)) - ddebug("user thread: Got sample at %s" % time.time()) - timestamp = sample.get_buffer().pts - - if timeout_secs is not None: - if not self.start_timestamp: - self.start_timestamp = timestamp - if (timestamp - self.start_timestamp > - timeout_secs * Gst.SECOND): - debug("timed out: %d - %d > %d" % ( - timestamp, self.start_timestamp, - timeout_secs * Gst.SECOND)) - return - - sample = _gst_sample_make_writable(sample) - try: - yield sample - finally: - self.push_sample(sample) - - def on_new_sample(self, appsink): - sample = appsink.emit("pull-sample") - self.tell_user_thread(sample) - if self.lock.acquire(False): # non-blocking - try: - self.push_sample(sample) - finally: - self.lock.release() - return Gst.FlowReturn.OK - - def tell_user_thread(self, sample_or_exception): - # `self.last_sample` (a synchronised Queue) is how we communicate from - # this thread (the GLib main loop) to the main application thread - # running the user's script. Note that only this thread writes to the - # Queue. - - if isinstance(sample_or_exception, Exception): - ddebug("glib thread: reporting exception to user thread: %s" % - sample_or_exception) - else: - ddebug("glib thread: new sample (timestamp=%s). Queue.qsize: %d" % - (sample_or_exception.get_buffer().pts, - self.last_sample.qsize())) - - # Drop old frame - try: - self.last_sample.get_nowait() - except Queue.Empty: - pass - - self.last_sample.put_nowait(sample_or_exception) - - def draw(self, obj, duration_secs): - if type(obj) in (str, unicode): - obj = ( - datetime.datetime.now().strftime("%H:%M:%S:%f")[:-4] + - ' ' + obj) - self.text_annotations.append((obj, duration_secs, None)) - elif type(obj) is MatchResult: - if obj.timestamp is not None: - self.match_annotations.append(obj) - else: - raise TypeError( - "Can't draw object of type '%s'" % type(obj).__name__) - - def push_sample(self, sample): - # Calculate whether we need to draw any annotations on the output video. - now = sample.get_buffer().pts - texts = self.text_annotations - matches = [] - for x in list(texts): - text, duration, end_time = x - if end_time is None: - end_time = now + (duration * Gst.SECOND) - texts.remove(x) - texts.append((text, duration, end_time)) - elif now > end_time: - texts.remove(x) - for match_result in list(self.match_annotations): - if match_result.timestamp == now: - matches.append(match_result) - if now >= match_result.timestamp: - self.match_annotations.remove(match_result) - - now = datetime.datetime.now().strftime("%H:%M:%S:%f")[:-4] - texts = texts + [(now, 0, 0)] - - if texts or matches: # Draw the annotations. - sample = _gst_sample_make_writable(sample) - with _numpy_from_sample(sample) as img: - for i in range(len(texts)): - text, _, _ = texts[len(texts) - i - 1] - origin = (10, (i + 1) * 30) - _draw_text(img, text, origin) - for match_result in matches: - _draw_match(img, match_result.region, match_result.match) - - self.appsrc.props.caps = sample.get_caps() - self.appsrc.emit("push-buffer", sample.get_buffer()) - - def on_error(self, pipeline, _bus, message): - assert message.type == Gst.MessageType.ERROR - Gst.debug_bin_to_dot_file_with_ts( - pipeline, Gst.DebugGraphDetails.ALL, "ERROR") - err, dbg = message.parse_error() - self.tell_user_thread( - UITestError("%s: %s\n%s\n" % (err, err.message, dbg))) - _mainloop.quit() - - def on_warning(self, _bus, message): - assert message.type == Gst.MessageType.WARNING - Gst.debug_bin_to_dot_file_with_ts( - self.source_pipeline, Gst.DebugGraphDetails.ALL, "WARNING") - err, dbg = message.parse_warning() - warn("Warning: %s: %s\n%s\n" % (err, err.message, dbg)) - - def on_eos_from_source_pipeline(self, _bus, _message): - if not self.tearing_down: - warn("Got EOS from source pipeline") - self.restart_source() - - def on_eos_from_sink_pipeline(self, _bus, _message): - debug("Got EOS") - _mainloop.quit() - - def on_underrun(self, _element): - if self.underrun_timeout: - ddebug("underrun: I already saw a recent underrun; ignoring") - else: - ddebug("underrun: scheduling 'restart_source' in 2s") - self.underrun_timeout = GObjectTimeout(2, self.restart_source) - self.underrun_timeout.start() - - def on_running(self, _element): - if self.underrun_timeout: - ddebug("running: cancelling underrun timer") - self.underrun_timeout.cancel() - self.underrun_timeout = None - else: - ddebug("running: no outstanding underrun timers; ignoring") - - def restart_source(self, *_args): - warn("Attempting to recover from video loss: " - "Stopping source pipeline and waiting 5s...") - self.source_pipeline.set_state(Gst.State.NULL) - self.source_pipeline = None - GObjectTimeout(5, self.start_source).start() - return False # stop the timeout from running again - - def start_source(self): - if self.tearing_down: - return False - warn("Restarting source pipeline...") - self.create_source_pipeline() - self.source_pipeline.set_state(Gst.State.PLAYING) - warn("Restarted source pipeline") - if self.restart_source_enabled: - self.underrun_timeout.start() - return False # stop the timeout from running again - - @staticmethod - def appsink_await_eos(appsink, timeout=None): - done = threading.Event() - - def on_eos(_appsink): - done.set() - return True - hid = appsink.connect('eos', on_eos) - d = appsink.get_property('eos') or done.wait(timeout) - appsink.disconnect(hid) - return d - - def teardown(self): - self.tearing_down = True - self.source_pipeline, source = None, self.source_pipeline - if source: - for elem in gst_iterate(source.iterate_sources()): - elem.send_event(Gst.Event.new_eos()) # pylint: disable=E1120 - if not self.appsink_await_eos( - source.get_by_name('appsink'), timeout=10): - debug("teardown: Source pipeline did not teardown gracefully") - source.set_state(Gst.State.NULL) - source = None - if not self.novideo: - debug("teardown: Sending eos") - self.appsrc.emit("end-of-stream") - self.mainloop_thread.join(10) - debug("teardown: Exiting (GLib mainloop %s)" % ( - "is still alive!" if self.mainloop_thread.isAlive() else "ok")) - - -def _draw_text(numpy_image, text, origin): - (width, height), _ = cv2.getTextSize( - text, fontFace=cv2.FONT_HERSHEY_DUPLEX, fontScale=1.0, thickness=1) - cv2.rectangle( - numpy_image, (origin[0] - 2, origin[1] + 2), - (origin[0] + width + 2, origin[1] - height - 2), - thickness=cv2.cv.CV_FILLED, color=(0, 0, 0)) - cv2.putText( - numpy_image, text, origin, cv2.FONT_HERSHEY_DUPLEX, fontScale=1.0, - color=(255, 255, 255)) - - -def _draw_match(numpy_image, region, match_, thickness=3): - cv2.rectangle( - numpy_image, (region.x, region.y), (region.right, region.bottom), - (32, 0 if match_ else 255, 255), # bgr - thickness=thickness) - - -class GObjectTimeout(object): - """Responsible for setting a timeout in the GTK main loop.""" - def __init__(self, timeout_secs, handler, *args): - self.timeout_secs = timeout_secs - self.handler = handler - self.args = args - self.timeout_id = None - - def start(self): - self.timeout_id = GObject.timeout_add( - self.timeout_secs * 1000, self.handler, *self.args) - - def cancel(self): - if self.timeout_id: - GObject.source_remove(self.timeout_id) - self.timeout_id = None - - -_BGR_CAPS = Gst.Caps.from_string('video/x-raw,format=BGR') - - -def _match(image, template, match_parameters, template_name): - if any(image.shape[x] < template.shape[x] for x in (0, 1)): - raise ValueError("Source image must be larger than template image") - if any(template.shape[x] < 1 for x in (0, 1)): - raise ValueError("Template image must contain some data") - if template.shape[2] != 3: - raise ValueError("Template image must be 3 channel BGR") - if template.dtype != numpy.uint8: - raise ValueError("Template image must be 8-bits per channel") - - first_pass_matched, position, first_pass_certainty = _find_match( - image, template, match_parameters) - matched = ( - first_pass_matched and - _confirm_match(image, position, template, match_parameters)) - - region = Region(position.x, position.y, - template.shape[1], template.shape[0]) - - if logging.get_debug_level() > 1: - source_with_roi = image.copy() - _draw_match(source_with_roi, region, first_pass_matched, thickness=1) - _log_image( - source_with_roi, "source_with_roi", "stbt-debug/detect_match") - _log_image_descriptions( - template_name, matched, position, - first_pass_matched, first_pass_certainty, match_parameters) - - return matched, region, first_pass_certainty - - -def _find_match(image, template, match_parameters): - """Search for `template` in the entire `image`. - - This searches the entire image, so speed is more important than accuracy. - False positives are ok; we apply a second pass (`_confirm_match`) to weed - out false positives. - - http://docs.opencv.org/modules/imgproc/doc/object_detection.html - http://opencv-code.com/tutorials/fast-template-matching-with-image-pyramid - """ - - log = functools.partial(_log_image, directory="stbt-debug/detect_match") - log(image, "source") - log(template, "template") - ddebug("Original image %s, template %s" % (image.shape, template.shape)) - - levels = get_config("match", "pyramid_levels", type_=int) - if levels <= 0: - raise ConfigurationError("'match.pyramid_levels' must be > 0") - template_pyramid = _build_pyramid(template, levels) - image_pyramid = _build_pyramid(image, len(template_pyramid)) - roi_mask = None # Initial region of interest: The whole image. - - for level in reversed(range(len(template_pyramid))): - - matched, best_match_position, certainty, roi_mask = _match_template( - image_pyramid[level], template_pyramid[level], match_parameters, - roi_mask, level) - - if level == 0 or not matched: - return matched, _upsample(best_match_position, level), certainty - - -def _match_template(image, template, match_parameters, roi_mask, level): - - log = functools.partial(_log_image, directory="stbt-debug/detect_match") - log_prefix = "level%d-" % level - ddebug("Level %d: image %s, template %s" % ( - level, image.shape, template.shape)) - - method = { - 'sqdiff-normed': cv2.TM_SQDIFF_NORMED, - 'ccorr-normed': cv2.TM_CCORR_NORMED, - 'ccoeff-normed': cv2.TM_CCOEFF_NORMED, - }[match_parameters.match_method] - threshold = max( - 0, - match_parameters.match_threshold - (0.2 if level > 0 else 0)) - - matches_heatmap = ( - (numpy.ones if method == cv2.TM_SQDIFF_NORMED else numpy.zeros)( - (image.shape[0] - template.shape[0] + 1, - image.shape[1] - template.shape[1] + 1), - dtype=numpy.float32)) - - if roi_mask is None or any(x < 3 for x in roi_mask.shape): - rois = [ # Initial region of interest: The whole image. - _Rect(0, 0, matches_heatmap.shape[1], matches_heatmap.shape[0])] - else: - roi_mask = cv2.pyrUp(roi_mask) - log(roi_mask, log_prefix + "roi_mask") - contours, _ = cv2.findContours( - roi_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) - rois = [ - _Rect(*cv2.boundingRect(x)) - # findContours ignores 1-pixel border of the image - .shift(Position(-1, -1)).expand(_Size(2, 2)) - for x in contours] - - if logging.get_debug_level() > 1: - source_with_rois = image.copy() - for roi in rois: - r = roi - t = _Size(*template.shape[:2]) - s = _Size(*source_with_rois.shape[:2]) - cv2.rectangle( - source_with_rois, - (max(0, r.x), max(0, r.y)), - (min(s.w - 1, r.x + r.w + t.w - 1), - min(s.h - 1, r.y + r.h + t.h - 1)), - (0, 255, 255), - thickness=1) - log(source_with_rois, log_prefix + "source_with_rois") - - for roi in rois: - r = roi.expand(_Size(*template.shape[:2])).shrink(_Size(1, 1)) - ddebug("Level %d: Searching in %s" % (level, roi)) - cv2.matchTemplate( - image[r.to_slice()], - template, - method, - matches_heatmap[roi.to_slice()]) - - log(image, log_prefix + "source") - log(template, log_prefix + "template") - log(matches_heatmap, log_prefix + "source_matchtemplate") - - min_value, max_value, min_location, max_location = cv2.minMaxLoc( - matches_heatmap) - if method == cv2.TM_SQDIFF_NORMED: - certainty = (1 - min_value) - best_match_position = Position(*min_location) - elif method in (cv2.TM_CCORR_NORMED, cv2.TM_CCOEFF_NORMED): - certainty = max_value - best_match_position = Position(*max_location) - else: - raise ValueError("Invalid matchTemplate method '%s'" % method) - - _, new_roi_mask = cv2.threshold( - matches_heatmap, - ((1 - threshold) if method == cv2.TM_SQDIFF_NORMED else threshold), - 255, - (cv2.THRESH_BINARY_INV if method == cv2.TM_SQDIFF_NORMED - else cv2.THRESH_BINARY)) - new_roi_mask = new_roi_mask.astype(numpy.uint8) - log(new_roi_mask, log_prefix + "source_matchtemplate_threshold") - - matched = certainty >= threshold - ddebug("Level %d: %s at %s with certainty %s" % ( - level, "Matched" if matched else "Didn't match", - best_match_position, certainty)) - return (matched, best_match_position, certainty, new_roi_mask) - - -def _build_pyramid(image, levels): - """A "pyramid" is [an image, the same image at 1/2 the size, at 1/4, ...] - - As a performance optimisation, image processing algorithms work on a - "pyramid" by first identifying regions of interest (ROIs) in the smallest - image; if results are positive, they proceed to the next larger image, etc. - See http://docs.opencv.org/doc/tutorials/imgproc/pyramids/pyramids.html - - The original-sized image is called "level 0", the next smaller image "level - 1", and so on. This numbering corresponds to the array index of the - "pyramid" array. - """ - pyramid = [image] - for _ in range(levels - 1): - if any(x < 20 for x in pyramid[-1].shape[:2]): - break - pyramid.append(cv2.pyrDown(pyramid[-1])) - return pyramid - - -def _upsample(position, levels): - """Convert position coordinates by the given number of pyramid levels. - - There is a loss of precision (unless ``levels`` is 0, in which case this - function is a no-op). - """ - return Position(position.x * 2 ** levels, position.y * 2 ** levels) - - -# Order of parameters consistent with ``cv2.boudingRect``. -class _Rect(namedtuple("_Rect", "x y w h")): - def expand(self, size): - return _Rect(self.x, self.y, self.w + size.w, self.h + size.h) - - def shrink(self, size): - return _Rect(self.x, self.y, self.w - size.w, self.h - size.h) - - def shift(self, position): - return _Rect(self.x + position.x, self.y + position.y, self.w, self.h) - - def to_slice(self): - """Return a 2-dimensional slice suitable for indexing a numpy array.""" - return (slice(self.y, self.y + self.h), slice(self.x, self.x + self.w)) - - -# Order of parameters consistent with OpenCV's ``numpy.ndarray.shape``. -class _Size(namedtuple("_Size", "h w")): - pass - - -def _confirm_match(image, position, template, match_parameters): - """Confirm that `template` matches `image` at `position`. - - This only checks `template` at a single position within `image`, so we can - afford to do more computationally-intensive checks than `_find_match`. - """ - - if match_parameters.confirm_method == "none": - return True - - log = functools.partial(_log_image, directory="stbt-debug/detect_match") - - # Set Region Of Interest to the "best match" location - roi = image[ - position.y:(position.y + template.shape[0]), - position.x:(position.x + template.shape[1])] - image_gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY) - log(roi, "confirm-source_roi") - log(image_gray, "confirm-source_roi_gray") - log(template_gray, "confirm-template_gray") - - if match_parameters.confirm_method == "normed-absdiff": - cv2.normalize(image_gray, image_gray, 0, 255, cv2.NORM_MINMAX) - cv2.normalize(template_gray, template_gray, 0, 255, cv2.NORM_MINMAX) - log(image_gray, "confirm-source_roi_gray_normalized") - log(template_gray, "confirm-template_gray_normalized") - - absdiff = cv2.absdiff(image_gray, template_gray) - _, thresholded = cv2.threshold( - absdiff, int(match_parameters.confirm_threshold * 255), - 255, cv2.THRESH_BINARY) - eroded = cv2.erode( - thresholded, - cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)), - iterations=match_parameters.erode_passes) - log(absdiff, "confirm-absdiff") - log(thresholded, "confirm-absdiff_threshold") - log(eroded, "confirm-absdiff_threshold_erode") - - return cv2.countNonZero(eroded) == 0 - - -_frame_number = 0 - - -def _log_image(image, name, directory): - if logging.get_debug_level() <= 1: - return - global _frame_number - if name == "source": - _frame_number += 1 - d = os.path.join(directory, "%05d" % _frame_number) - try: - utils.mkdir_p(d) - except OSError: - warn("Failed to create directory '%s'; won't save debug images." % d) - return - with _numpy_from_sample(image, readonly=True) as img: - if img.dtype == numpy.float32: - img = cv2.convertScaleAbs(img, alpha=255) - cv2.imwrite(os.path.join(d, name) + ".png", img) - - -def _log_image_descriptions( - template_name, matched, position, - first_pass_matched, first_pass_certainty, match_parameters): - """Create html file that describes the debug images.""" - - try: - import jinja2 - except ImportError: - warn( - "Not generating html guide to the image-processing debug images, " - "because python 'jinja2' module is not installed.") - return - - d = os.path.join("stbt-debug/detect_match", "%05d" % _frame_number) - - template = jinja2.Template(""" - - - - - - - -
-

- {{template_name}} - {{"matched" if matched else "didn't match"}} -

- -

Searching for template {{link("template")}} - within source {{link("source")}} image. - - {% for level in levels %} - -

At level {{level}}: -

    -
  • Searching for template {{link("template", level)}} - within source regions of interest - {{link("source_with_rois", level)}}. -
  • OpenCV matchTemplate result - {{link("source_matchtemplate", level)}} - with method {{match_parameters.match_method}} - ({{"darkest" if match_parameters.match_method == - "sqdiff-normed" else "lightest"}} - pixel indicates position of best match). -
  • matchTemplate result above match_threshold - {{link("source_matchtemplate_threshold", level)}} - of {{"%g"|format(match_parameters.match_threshold)}} - (white pixels indicate positions above the threshold). - - {% if (level == 0 and first_pass_matched) or level != min(levels) %} -
  • Matched at {{position}} {{link("source_with_roi")}} - with certainty {{"%.4f"|format(first_pass_certainty)}}. - {% else %} -
  • Didn't match (best match at {{position}} - {{link("source_with_roi")}} - with certainty {{"%.4f"|format(first_pass_certainty)}}). - {% endif %} - -
- - {% endfor %} - - {% if first_pass_certainty >= match_parameters.match_threshold %} -

Second pass (confirmation): -

    -
  • Comparing template {{link("confirm-template_gray")}} - against source image's region of interest - {{link("confirm-source_roi_gray")}}. - - {% if match_parameters.confirm_method == "normed-absdiff" %} -
  • Normalised template - {{link("confirm-template_gray_normalized")}} - and source - {{link("confirm-source_roi_gray_normalized")}}. - {% endif %} - -
  • Absolute differences {{link("confirm-absdiff")}}. -
  • Differences above confirm_threshold - {{link("confirm-absdiff_threshold")}} - of {{"%.2f"|format(match_parameters.confirm_threshold)}}. -
  • After eroding - {{link("confirm-absdiff_threshold_erode")}} - {{match_parameters.erode_passes}} - {{"time" if match_parameters.erode_passes == 1 - else "times"}}. - {{"No" if matched else "Some"}} - differences (white pixels) remain, so the template - {{"does" if matched else "doesn't"}} match. -
- {% endif %} - -

For further help please read - stb-tester - image matching parameters. - -

- - - """) - - with open(os.path.join(d, "index.html"), "w") as f: - f.write(template.render( - first_pass_certainty=first_pass_certainty, - first_pass_matched=first_pass_matched, - levels=list(reversed(sorted(set( - [int(re.search(r"level(\d+)-.*", x).group(1)) - for x in glob.glob(os.path.join(d, "level*"))])))), - link=lambda s, level=None: ( - "" - .format("" if level is None else "level%d-" % level, s)), - match_parameters=match_parameters, - matched=matched, - min=min, - position=position, - template_name=template_name, - )) - - -def _find_path(image): - """Searches for the given filename and returns the full path. - - Searches in the directory of the script that called (for example) - detect_match, then in the directory of that script's caller, etc. - """ - - if os.path.isabs(image): - return image - - # stack()[0] is _find_path; - # stack()[1] is _find_path's caller, e.g. detect_match; - # stack()[2] is detect_match's caller (the user script). - for caller in inspect.stack()[2:]: - caller_image = os.path.join( - os.path.dirname(inspect.getframeinfo(caller[0]).filename), - image) - if os.path.isfile(caller_image): - return os.path.abspath(caller_image) - - # Fall back to image from cwd, for convenience of the selftests - return os.path.abspath(image) - - -def _load_mask(mask): - """Loads the given mask file and returns it as an OpenCV image.""" - mask_path = _find_path(mask) - debug("Using mask %s" % mask_path) - if not os.path.isfile(mask_path): - raise UITestError("No such mask file: %s" % mask) - mask_image = cv2.imread(mask_path, cv2.CV_LOAD_IMAGE_GRAYSCALE) - if mask_image is None: - raise UITestError("Failed to load mask file: %s" % mask_path) - return mask_image - - -# Tests -# =========================================================================== - -def test_wait_for_motion_half_motion_str_2of4(): - with _fake_frames_at_half_motion(): - wait_for_motion(consecutive_frames='2/4') - - -def test_wait_for_motion_half_motion_str_2of3(): - with _fake_frames_at_half_motion(): - wait_for_motion(consecutive_frames='2/3') - - -def test_wait_for_motion_half_motion_str_3of4(): - with _fake_frames_at_half_motion(): - try: - wait_for_motion(consecutive_frames='3/4') - assert False, "wait_for_motion succeeded unexpectedly" - except MotionTimeout: - pass - - -def test_wait_for_motion_half_motion_int(): - with _fake_frames_at_half_motion(): - try: - wait_for_motion(consecutive_frames=2) - assert False, "wait_for_motion succeeded unexpectedly" - except MotionTimeout: - pass - - -@contextmanager -def _fake_frames_at_half_motion(): - class FakeDisplay(object): - def gst_samples(self, _timeout_secs=10): - data = [ - numpy.zeros((2, 2, 3), dtype=numpy.uint8), - numpy.ones((2, 2, 3), dtype=numpy.uint8) * 255, - ] - for i in range(10): - buf = Gst.Buffer.new_wrapped(data[(i // 2) % 2].flatten()) - buf.pts = i * 1000000000 - yield _gst_sample_make_writable( - Gst.Sample.new(buf, Gst.Caps.from_string( - 'video/x-raw,format=BGR,width=2,height=2'), None, None)) - - def _get_frame(): - return None - - global _display, get_frame # pylint: disable=W0601 - orig_display, orig_get_frame = _display, get_frame - _display, get_frame = FakeDisplay(), _get_frame - yield - _display, get_frame = orig_display, orig_get_frame - - -def test_ocr_on_static_images(): - for image, expected_text, region, mode in [ - # pylint: disable=C0301 - ("Connection-status--white-on-dark-blue.png", "Connection status: Connected", None, None), - ("Connection-status--white-on-dark-blue.png", "Connected", Region(x=210, y=0, width=120, height=40), None), - ("programme--white-on-black.png", "programme", None, None), - ("UJJM--white-text-on-grey-boxes.png", "", None, None), - ("UJJM--white-text-on-grey-boxes.png", "UJJM", None, OcrMode.SINGLE_LINE), - ]: - kwargs = {"region": region} - if mode is not None: - kwargs["mode"] = mode - text = ocr( - cv2.imread(os.path.join( - os.path.dirname(__file__), "..", "tests", "ocr", image)), - **kwargs) - assert text == expected_text, ( - "Unexpected text. Expected '%s'. Got: %s" % (expected_text, text)) + _dut.__exit__(None, None, None) diff -Nru stb-tester-22/_stbt/control.py stb-tester-23-1-gf70a21c/_stbt/control.py --- stb-tester-22/_stbt/control.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/control.py 2015-07-08 17:05:05.000000000 +0000 @@ -73,10 +73,15 @@ """ def __init__(self, display): - self.videosrc = display.source_pipeline.get_by_name("videotestsrc0") - if not self.videosrc: + self.display = display + + @property + def videosrc(self): + videosrc = self.display.source_pipeline.get_by_name("videotestsrc0") + if not videosrc: raise ConfigurationError('The "test" control can only be used ' 'with source-pipeline = "videotestsrc"') + return videosrc def press(self, key): if key not in [ diff -Nru stb-tester-22/_stbt/core.py stb-tester-23-1-gf70a21c/_stbt/core.py --- stb-tester-22/_stbt/core.py 1970-01-01 00:00:00.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/core.py 2015-07-08 17:05:05.000000000 +0000 @@ -0,0 +1,2342 @@ +# coding: utf-8 +"""Main stb-tester python module. Intended to be used with `stbt run`. + +See `man stbt` and http://stb-tester.com for documentation. + +Copyright 2012-2013 YouView TV Ltd and contributors. +License: LGPL v2.1 or (at your option) any later version (see +https://github.com/stb-tester/stb-tester/blob/master/LICENSE for details). +""" + +from __future__ import absolute_import + +import argparse +import datetime +import functools +import glob +import inspect +import os +import Queue +import re +import subprocess +import threading +import time +import traceback +import warnings +from collections import deque, namedtuple +from contextlib import contextmanager +from distutils.version import LooseVersion + +import cv2 +import gi +import numpy +from enum import IntEnum +from gi.repository import GLib, GObject, Gst # pylint: disable=E0611 + +from _stbt import logging, utils +from _stbt.config import ConfigurationError, get_config +from _stbt.gst_hacks import gst_iterate, map_gst_buffer +from _stbt.logging import ddebug, debug, warn + +gi.require_version("Gst", "1.0") + +if getattr(gi, "version_info", (0, 0, 0)) < (3, 12, 0): + GObject.threads_init() +Gst.init(None) + +warnings.filterwarnings( + action="always", category=DeprecationWarning, message='.*stb-tester') + + +# Functions available to stbt scripts +# =========================================================================== + + +class MatchParameters(object): + """Parameters to customise the image processing algorithm used by + `match`, `wait_for_match`, and `press_until_match`. + + You can change the default values for these parameters by setting a key + (with the same name as the corresponding python parameter) in the `[match]` + section of stbt.conf. But we strongly recommend that you don't change the + default values from what is documented here. + + You should only need to change these parameters when you're trying to match + a template image that isn't actually a perfect match -- for example if + there's a translucent background with live TV visible behind it; or if you + have a template image of a button's background and you want it to match even + if the text on the button doesn't match. + + :param str match_method: + The method to be used by the first pass of stb-tester's image matching + algorithm, to find the most likely location of the "template" image + within the larger source image. + + Allowed values are "sqdiff-normed", "ccorr-normed", and "ccoeff-normed". + For the meaning of these parameters, see OpenCV's `cvMatchTemplate + `_. + + We recommend that you don't change this from its default value of + "sqdiff-normed". + + :param float match_threshold: + How strong a result from the first pass must be, to be considered a + match. Valid values range from 0 (anything is considered to match) + to 1 (the match has to be pixel perfect). This defaults to 0.8. + + :param str confirm_method: + The method to be used by the second pass of stb-tester's image matching + algorithm, to confirm that the region identified by the first pass is a + good match. + + The first pass often gives false positives (it reports a "match" for an + image that shouldn't match). The second pass is more CPU-intensive, but + it only checks the position of the image that the first pass identified. + The allowed values are: + + :"none": + Do not confirm the match. Assume that the potential match found is + correct. + + :"absdiff": + Compare the absolute difference of each pixel from the template image + against its counterpart from the candidate region in the source video + frame. + + :"normed-absdiff": + Normalise the pixel values from both the template image and the + candidate region in the source video frame, then compare the absolute + difference as with "absdiff". + + This gives better results with low-contrast images. We recommend setting + this as the default `confirm_method` in stbt.conf, with a + `confirm_threshold` of 0.30. + + :param float confirm_threshold: + The maximum allowed difference between any given pixel from the template + image and its counterpart from the candidate region in the source video + frame, as a fraction of the pixel's total luminance range. + + Valid values range from 0 (more strict) to 1.0 (less strict). + Useful values tend to be around 0.16 for the "absdiff" method, and 0.30 + for the "normed-absdiff" method. + + :param int erode_passes: + After the "absdiff" or "normed-absdiff" absolute difference is taken, + stb-tester runs an erosion algorithm that removes single-pixel differences + to account for noise. Useful values are 1 (the default) and 0 (to disable + this step). + + """ + + def __init__(self, match_method=None, match_threshold=None, + confirm_method=None, confirm_threshold=None, + erode_passes=None): + if match_method is None: + match_method = get_config('match', 'match_method') + if match_threshold is None: + match_threshold = get_config( + 'match', 'match_threshold', type_=float) + if confirm_method is None: + confirm_method = get_config('match', 'confirm_method') + if confirm_threshold is None: + confirm_threshold = get_config( + 'match', 'confirm_threshold', type_=float) + if erode_passes is None: + erode_passes = get_config('match', 'erode_passes', type_=int) + + if match_method not in ( + "sqdiff-normed", "ccorr-normed", "ccoeff-normed"): + raise ValueError("Invalid match_method '%s'" % match_method) + if confirm_method not in ("none", "absdiff", "normed-absdiff"): + raise ValueError("Invalid confirm_method '%s'" % confirm_method) + + self.match_method = match_method + self.match_threshold = match_threshold + self.confirm_method = confirm_method + self.confirm_threshold = confirm_threshold + self.erode_passes = erode_passes + + +class Position(namedtuple('Position', 'x y')): + """A point within the video frame. + + `x` and `y` are integer coordinates (measured in number of pixels) from the + top left corner of the video frame. + """ + pass + + +class Region(namedtuple('Region', 'x y right bottom')): + u""" + ``Region(x, y, width=width, height=height)`` or + ``Region(x, y, right=right, bottom=bottom)`` + + Rectangular region within the video frame. + + For example, given the following regions a, b, and c:: + + - 01234567890123 + 0 ░░░░░░░░ + 1 ░a░░░░░░ + 2 ░░░░░░░░ + 3 ░░░░░░░░ + 4 ░░░░▓▓▓▓░░▓c▓ + 5 ░░░░▓▓▓▓░░▓▓▓ + 6 ░░░░▓▓▓▓░░░░░ + 7 ░░░░▓▓▓▓░░░░░ + 8 ░░░░░░b░░ + 9 ░░░░░░░░░ + + >>> a = Region(0, 0, width=8, height=8) + >>> b = Region(4, 4, right=13, bottom=10) + >>> c = Region(10, 4, width=3, height=2) + >>> a.right + 8 + >>> b.bottom + 10 + >>> b.contains(c), a.contains(b), c.contains(b) + (True, False, False) + >>> b.extend(x=6, bottom=-4) == c + True + >>> a.extend(right=5).contains(c) + True + >>> a.width, a.extend(x=3).width, a.extend(right=-3).width + (8, 5, 5) + >>> print Region.intersect(a, b) + Region(x=4, y=4, width=4, height=4) + >>> Region.intersect(a, b) == Region.intersect(b, a) + True + >>> Region.intersect(c, b) == c + True + >>> print Region.intersect(a, c) + None + >>> print Region.intersect(None, a) + None + >>> quadrant = Region(x=float("-inf"), y=float("-inf"), right=0, bottom=0) + >>> quadrant.translate(2, 2) + Region(x=-inf, y=-inf, right=2, bottom=2) + >>> print c.translate(x=-9, y=-3) + Region(x=1, y=1, width=3, height=2) + >>> Region.intersect(Region.ALL, c) == c + True + >>> Region.ALL + Region.ALL + >>> print Region.ALL + Region.ALL + + .. py:attribute:: x + + The x coordinate of the left edge of the region, measured in pixels + from the left of the video frame (inclusive). + + .. py:attribute:: y + + The y coordinate of the top edge of the region, measured in pixels from + the top of the video frame (inclusive). + + .. py:attribute:: right + + The x coordinate of the right edge of the region, measured in pixels + from the left of the video frame (exclusive). + + .. py:attribute:: bottom + + The y coordinate of the bottom edge of the region, measured in pixels + from the top of the video frame (exclusive). + + ``x``, ``y``, ``right``, and ``bottom`` can be infinite -- that is, + ``float("inf")`` or ``-float("inf")``. + """ + def __new__(cls, x, y, width=None, height=None, right=None, bottom=None): + if (width is None) == (right is None): + raise ValueError("You must specify either 'width' or 'right'") + if (height is None) == (bottom is None): + raise ValueError("You must specify either 'height' or 'bottom'") + if right is None: + right = x + width + if bottom is None: + bottom = y + height + if right <= x: + raise ValueError("'right' must be greater than 'x'") + if bottom <= y: + raise ValueError("'bottom' must be greater than 'y'") + return super(Region, cls).__new__(cls, x, y, right, bottom) + + def __str__(self): + if self == Region.ALL: + return 'Region.ALL' + else: + return 'Region(x=%s, y=%s, width=%s, height=%s)' \ + % (self.x, self.y, self.width, self.height) + + def __repr__(self): + if self == Region.ALL: + return 'Region.ALL' + else: + return super(Region, self).__repr__() + + @property + def width(self): + """The width of the region, measured in pixels.""" + return self.right - self.x + + @property + def height(self): + """The height of the region, measured in pixels.""" + return self.bottom - self.y + + @staticmethod + def from_extents(x, y, right, bottom): + """Create a Region using right and bottom extents rather than width and + height. + + Deprecated since we added ``right`` and ``bottom`` to Region + constructor. + + >>> b = Region.from_extents(4, 4, 13, 10) + >>> print b + Region(x=4, y=4, width=9, height=6) + """ + return Region(x, y, right=right, bottom=bottom) + + @staticmethod + def intersect(a, b): + """ + :returns: The intersection of regions ``a`` and ``b``, or ``None`` if + the regions don't intersect. + + Either ``a`` or ``b`` can be ``None`` so intersect is commutative and + associative. + """ + if a is None or b is None: + return None + else: + extents = (max(a.x, b.x), max(a.y, b.y), + min(a.right, b.right), min(a.bottom, b.bottom)) + if extents[0] < extents[2] and extents[1] < extents[3]: + return Region.from_extents(*extents) + else: + return None + + def contains(self, other): + """:returns: True if ``other`` is entirely contained within self.""" + return (other and self.x <= other.x and self.y <= other.y and + self.right >= other.right and self.bottom >= other.bottom) + + def extend(self, x=0, y=0, right=0, bottom=0): + """ + :returns: A new region with the edges of the region adjusted by the + given amounts. + """ + return Region.from_extents( + self.x + x, self.y + y, self.right + right, self.bottom + bottom) + + def translate(self, x=0, y=0): + """ + :returns: A new region with the position of the region adjusted by the + given amounts. + """ + return Region.from_extents(self.x + x, self.y + y, + self.right + x, self.bottom + y) + +Region.ALL = Region(x=float("-inf"), y=float("-inf"), + right=float("inf"), bottom=float("inf")) + + +def _bounding_box(a, b): + """Find the bounding box of two regions. Returns the smallest region which + contains both regions a and b. + + >>> print _bounding_box(Region(50, 20, 10, 20), Region(20, 30, 10, 20)) + Region(x=20, y=20, width=40, height=30) + >>> print _bounding_box(Region(20, 30, 10, 20), Region(20, 30, 10, 20)) + Region(x=20, y=30, width=10, height=20) + >>> print _bounding_box(None, Region(20, 30, 10, 20)) + Region(x=20, y=30, width=10, height=20) + >>> print _bounding_box(Region(20, 30, 10, 20), None) + Region(x=20, y=30, width=10, height=20) + >>> print _bounding_box(None, None) + None + """ + if a is None: + return b + if b is None: + return a + return Region.from_extents(min(a.x, b.x), min(a.y, b.y), + max(a.right, b.right), max(a.bottom, b.bottom)) + + +class MatchResult(object): + """The result from `match`. + + * ``timestamp``: Video stream timestamp. + * ``match``: Boolean result, the same as evaluating `MatchResult` as a bool. + That is, ``if match_result:`` will behave the same as + ``if match_result.match:``. + * ``region``: The `Region` in the video frame where the image was found. + * ``first_pass_result``: Value between 0 (poor) and 1.0 (excellent match) + from the first pass of stb-tester's two-pass image matching algorithm + (see `MatchParameters` for details). + * ``frame``: The video frame that was searched, in OpenCV format. + * ``image``: The template image that was searched for, as given to `match`. + """ + # pylint: disable=W0621 + def __init__( + self, timestamp, match, region, first_pass_result, frame=None, + image=None): + self.timestamp = timestamp + self.match = match + self.region = region + self.first_pass_result = first_pass_result + if frame is None: + warnings.warn( + "Creating a 'MatchResult' without specifying 'frame' is " + "deprecated. In a future release of stb-tester the 'frame' " + "parameter will be mandatory.", + DeprecationWarning, stacklevel=2) + self.frame = frame + if image is None: + warnings.warn( + "Creating a 'MatchResult' without specifying 'image' is " + "deprecated. In a future release of stb-tester the 'image' " + "parameter will be mandatory.", + DeprecationWarning, stacklevel=2) + image = "" + self.image = image + + def __str__(self): + return ( + "MatchResult(timestamp=%s, match=%s, region=%s, " + "first_pass_result=%s, frame=%s, image=%s)" % ( + self.timestamp, + self.match, + self.region, + self.first_pass_result, + "None" if self.frame is None else "%dx%dx%d" % ( + self.frame.shape[1], self.frame.shape[0], + self.frame.shape[2]), + "" if isinstance(self.image, numpy.ndarray) + else repr(self.image))) + + @property + def position(self): + return Position(self.region.x, self.region.y) + + def __nonzero__(self): + return self.match + + +class _AnnotatedTemplate(namedtuple('_AnnotatedTemplate', + 'image name filename')): + @property + def friendly_name(self): + return self.filename or '' + + +def _load_template(template): + if isinstance(template, _AnnotatedTemplate): + return template + if isinstance(template, numpy.ndarray): + return _AnnotatedTemplate(template, None, None) + else: + template_name = _find_path(template) + if not os.path.isfile(template_name): + raise UITestError("No such template file: %s" % template_name) + image = cv2.imread(template_name, cv2.CV_LOAD_IMAGE_COLOR) + if image is None: + raise UITestError("Failed to load template file: %s" % + template_name) + return _AnnotatedTemplate(image, template, template_name) + + +def _crop(frame, region): + if not _image_region(frame).contains(region): + raise ValueError("'frame' doesn't contain 'region'") + return frame[region.y:region.bottom, region.x:region.right] + + +def _image_region(image): + return Region(0, 0, image.shape[1], image.shape[0]) + + +class MotionResult(namedtuple('MotionResult', 'timestamp motion')): + """The result from `detect_motion`. + + * `timestamp`: Video stream timestamp. + * `motion`: Boolean result. + """ + pass + + +class OcrMode(IntEnum): + """Options to control layout analysis and assume a certain form of image. + + For a (brief) description of each option, see the `tesseract(1) + `_ + man page. + """ + ORIENTATION_AND_SCRIPT_DETECTION_ONLY = 0 + PAGE_SEGMENTATION_WITH_OSD = 1 + PAGE_SEGMENTATION_WITHOUT_OSD_OR_OCR = 2 + PAGE_SEGMENTATION_WITHOUT_OSD = 3 + SINGLE_COLUMN_OF_TEXT_OF_VARIABLE_SIZES = 4 + SINGLE_UNIFORM_BLOCK_OF_VERTICALLY_ALIGNED_TEXT = 5 + SINGLE_UNIFORM_BLOCK_OF_TEXT = 6 + SINGLE_LINE = 7 + SINGLE_WORD = 8 + SINGLE_WORD_IN_A_CIRCLE = 9 + SINGLE_CHARACTER = 10 + + # For nicer formatting of `ocr` signature in generated API documentation: + def __repr__(self): + return str(self) + + +class TextMatchResult(namedtuple( + "TextMatchResult", "timestamp match region frame text")): + + """The result from `match_text`. + + * ``timestamp``: Video stream timestamp. + * ``match``: Boolean result, the same as evaluating `TextMatchResult` as a + bool. That is, ``if result:`` will behave the same as + ``if result.match:``. + * ``region``: The `Region` (bounding box) of the text found, or ``None`` if + no text was found. + * ``frame``: The video frame that was searched, in OpenCV format. + * ``text``: The text (unicode string) that was searched for, as given to + `match_text`. + """ + # pylint: disable=E1101 + def __nonzero__(self): + return self.match + + def __str__(self): + return ( + "TextMatchResult(timestamp=%s, match=%s, region=%s, frame=%s, " + "text=%s)" % ( + self.timestamp, + self.match, + self.region, + "%dx%dx%d" % (self.frame.shape[1], self.frame.shape[0], + self.frame.shape[2]), + repr(self.text))) + + +def new_device_under_test_from_config( + gst_source_pipeline=None, gst_sink_pipeline=None, control_uri=None, + save_video=False, restart_source=None, transformation_pipeline=None): + from _stbt.control import uri_to_remote + + if gst_source_pipeline is None: + gst_source_pipeline = get_config('global', 'source_pipeline') + if gst_sink_pipeline is None: + gst_sink_pipeline = get_config('global', 'sink_pipeline') + if control_uri is None: + control_uri = get_config('global', 'control') + if restart_source is None: + restart_source = get_config('global', 'restart_source') + if transformation_pipeline is None: + gst_sink_pipeline = get_config('global', 'transformation_pipeline') + + display = Display( + gst_source_pipeline, gst_sink_pipeline, save_video, restart_source, + transformation_pipeline) + return DeviceUnderTest( + display=display, control=uri_to_remote(control_uri, display)) + + +class DeviceUnderTest(object): + def __init__(self, display=None, control=None): + self._time_of_last_press = None + self._display = display + self._control = control + + def __enter__(self): + if self._display: + self._display.startup() + return self + + def __exit__(self, _exc_type, _exc_value, _traceback): + if self._display: + self._display.teardown() + self._display = None + self._control = None + + def press(self, key, interpress_delay_secs=None): + if interpress_delay_secs is None: + interpress_delay_secs = get_config( + "press", "interpress_delay_secs", type_=float) + if self._time_of_last_press is not None: + # `sleep` is inside a `while` loop because the actual suspension + # time of `sleep` may be less than that requested. + while True: + seconds_to_wait = ( + self._time_of_last_press - datetime.datetime.now() + + datetime.timedelta(seconds=interpress_delay_secs) + ).total_seconds() + if seconds_to_wait > 0: + time.sleep(seconds_to_wait) + else: + break + + self._control.press(key) + self._time_of_last_press = datetime.datetime.now() + self.draw_text(key, duration_secs=3) + + def draw_text(self, text, duration_secs=3): + self._display.draw(text, duration_secs) + + def match(self, image, frame=None, match_parameters=None, + region=Region.ALL): + + if match_parameters is None: + match_parameters = MatchParameters() + + template = _load_template(image) + + grabbed_from_live = (frame is None) + if grabbed_from_live: + frame = self._display.get_sample() + + with _numpy_from_sample(frame, readonly=True) as npframe: + region = Region.intersect(_image_region(npframe), region) + + matched, match_region, first_pass_certainty = _match( + _crop(npframe, region), template.image, match_parameters, + template.friendly_name) + + match_region = match_region.translate(region.x, region.y) + result = MatchResult( + _get_frame_timestamp(frame), matched, match_region, + first_pass_certainty, numpy.copy(npframe), + (template.name or template.image)) + + if grabbed_from_live: + self._display.draw(result, None) + + if result.match: + debug("Match found: %s" % str(result)) + else: + debug("No match found. Closest match: %s" % str(result)) + + return result + + def detect_match(self, image, timeout_secs=10, match_parameters=None): + template = _load_template(image) + + debug("Searching for " + template.friendly_name) + + for sample in self._display.gst_samples(timeout_secs): + result = self.match( + template, frame=sample, match_parameters=match_parameters) + self._display.draw(result, None) + yield result + + def detect_motion(self, timeout_secs=10, noise_threshold=None, mask=None): + if noise_threshold is None: + noise_threshold = get_config( + 'motion', 'noise_threshold', type_=float) + + debug("Searching for motion") + + mask_image = None + if mask: + mask_image = _load_mask(mask) + + previous_frame_gray = None + log = functools.partial( + _log_image, directory="stbt-debug/detect_motion") + + for sample in self._display.gst_samples(timeout_secs): + with _numpy_from_sample(sample, readonly=True) as frame: + frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + log(frame_gray, "source") + + if previous_frame_gray is None: + if (mask_image is not None and + mask_image.shape[:2] != frame_gray.shape[:2]): + raise UITestError( + "The dimensions of the mask '%s' %s don't match the " + "video frame %s" % ( + mask, mask_image.shape, frame_gray.shape)) + previous_frame_gray = frame_gray + continue + + absdiff = cv2.absdiff(frame_gray, previous_frame_gray) + previous_frame_gray = frame_gray + log(absdiff, "absdiff") + + if mask_image is not None: + absdiff = cv2.bitwise_and(absdiff, mask_image) + log(mask_image, "mask") + log(absdiff, "absdiff_masked") + + _, thresholded = cv2.threshold( + absdiff, int((1 - noise_threshold) * 255), 255, + cv2.THRESH_BINARY) + eroded = cv2.erode( + thresholded, + cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))) + log(thresholded, "absdiff_threshold") + log(eroded, "absdiff_threshold_erode") + + motion = (cv2.countNonZero(eroded) > 0) + + # Visualisation: Highlight in red the areas where we detected motion + if motion: + with _numpy_from_sample(sample) as frame: + cv2.add( + frame, + numpy.multiply( + numpy.ones(frame.shape, dtype=numpy.uint8), + (0, 0, 255), # bgr + dtype=numpy.uint8), + mask=cv2.dilate( + thresholded, + cv2.getStructuringElement( + cv2.MORPH_ELLIPSE, (3, 3)), + iterations=1), + dst=frame) + + result = MotionResult(sample.get_buffer().pts, motion) + debug("%s found: %s" % ( + "Motion" if motion else "No motion", str(result))) + yield result + + def wait_for_match(self, image, timeout_secs=10, consecutive_matches=1, + match_parameters=None): + + if match_parameters is None: + match_parameters = MatchParameters() + + match_count = 0 + last_pos = Position(0, 0) + image = _load_template(image) + for res in self.detect_match( + image, timeout_secs, match_parameters=match_parameters): + if res.match and (match_count == 0 or res.position == last_pos): + match_count += 1 + else: + match_count = 0 + last_pos = res.position + if match_count == consecutive_matches: + debug("Matched " + image.friendly_name) + return res + + raise MatchTimeout(res.frame, image.friendly_name, timeout_secs) # pylint: disable=W0631,C0301 + + def press_until_match( + self, + key, + image, + interval_secs=None, + max_presses=None, + match_parameters=None): + + if interval_secs is None: + # Should this be float? + interval_secs = get_config( + "press_until_match", "interval_secs", type_=int) + if max_presses is None: + max_presses = get_config( + "press_until_match", "max_presses", type_=int) + + if match_parameters is None: + match_parameters = MatchParameters() + + i = 0 + + while True: + try: + return self.wait_for_match(image, timeout_secs=interval_secs, + match_parameters=match_parameters) + except MatchTimeout: + if i < max_presses: + self.press(key) + i += 1 + else: + raise + + def wait_for_motion( + self, timeout_secs=10, consecutive_frames=None, + noise_threshold=None, mask=None): + + if consecutive_frames is None: + consecutive_frames = get_config('motion', 'consecutive_frames') + + consecutive_frames = str(consecutive_frames) + if '/' in consecutive_frames: + motion_frames = int(consecutive_frames.split('/')[0]) + considered_frames = int(consecutive_frames.split('/')[1]) + else: + motion_frames = int(consecutive_frames) + considered_frames = int(consecutive_frames) + + if motion_frames > considered_frames: + raise ConfigurationError( + "`motion_frames` exceeds `considered_frames`") + + debug("Waiting for %d out of %d frames with motion" % ( + motion_frames, considered_frames)) + + matches = deque(maxlen=considered_frames) + for res in self.detect_motion(timeout_secs, noise_threshold, mask): + matches.append(res.motion) + if matches.count(True) >= motion_frames: + debug("Motion detected.") + return res + + screenshot = self.get_frame() + raise MotionTimeout(screenshot, mask, timeout_secs) + + def ocr(self, frame=None, region=Region.ALL, + mode=OcrMode.PAGE_SEGMENTATION_WITHOUT_OSD, + lang="eng", tesseract_config=None, tesseract_user_words=None, + tesseract_user_patterns=None): + + if frame is None: + frame = self._display.get_sample() + + if region is None: + warnings.warn( + "Passing region=None to ocr is deprecated since 0.21 and the " + "meaning will change in a future version. To OCR an entire " + "video frame pass region=Region.ALL instead", + DeprecationWarning, stacklevel=2) + region = Region.ALL + + text, region = _tesseract( + frame, region, mode, lang, tesseract_config, + user_patterns=tesseract_user_patterns, + user_words=tesseract_user_words) + text = text.decode('utf-8').strip().translate(_ocr_transtab) + debug(u"OCR in region %s read '%s'." % (region, text)) + return text + + def match_text(self, text, frame=None, region=Region.ALL, + mode=OcrMode.PAGE_SEGMENTATION_WITHOUT_OSD, lang="eng", + tesseract_config=None): + + import lxml.etree + if frame is None: + frame = self.get_frame() + + _config = dict(tesseract_config or {}) + _config['tessedit_create_hocr'] = 1 + + ts = _get_frame_timestamp(frame) + + xml, region = _tesseract(frame, region, mode, lang, _config) + if xml == '': + return TextMatchResult(ts, False, None, frame, text) + hocr = lxml.etree.fromstring(xml) + p = _hocr_find_phrase(hocr, text.split()) + if p: + # Find bounding box + box = None + for _, elem in p: + box = _bounding_box(box, _hocr_elem_region(elem)) + # _tesseract crops to region and scales up by a factor of 3 so we + # must undo this transformation here. + box = Region.from_extents( + region.x + box.x // 3, region.y + box.y // 3, + region.x + box.right // 3, region.y + box.bottom // 3) + return TextMatchResult(ts, True, box, frame, text) + else: + return TextMatchResult(ts, False, None, frame, text) + + def frames(self, timeout_secs=None): + return self._display.frames(timeout_secs) + + def get_frame(self): + with _numpy_from_sample( + self._display.get_sample(), readonly=True) as frame: + return frame.copy() + + def is_screen_black(self, frame=None, mask=None, threshold=None): + if threshold is None: + threshold = get_config('is_screen_black', 'threshold', type_=int) + if mask: + mask = _load_mask(mask) + if frame is None: + frame = self._display.get_sample() + with _numpy_from_sample(frame, readonly=True) as f: + greyframe = cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) + _, greyframe = cv2.threshold( + greyframe, threshold, 255, cv2.THRESH_BINARY) + _, maxVal, _, _ = cv2.minMaxLoc(greyframe, mask) + if logging.get_debug_level() > 1: + _log_image(frame, 'source', 'stbt-debug/is_screen_black') + if mask is not None: + _log_image(mask, 'mask', 'stbt-debug/is_screen_black') + _log_image(numpy.bitwise_and(greyframe, mask), + 'non-black-regions-after-masking', + 'stbt-debug/is_screen_black') + else: + _log_image(greyframe, 'non-black-regions-after-masking', + 'stbt-debug/is_screen_black') + return maxVal == 0 + +# Utility functions +# =========================================================================== + + +def save_frame(image, filename): + """Saves an OpenCV image to the specified file. + + Takes an image obtained from `get_frame` or from the `screenshot` + property of `MatchTimeout` or `MotionTimeout`. + """ + cv2.imwrite(filename, image) + + +def wait_until(callable_, timeout_secs=10, interval_secs=0): + """Wait until a condition becomes true, or until a timeout. + + ``callable_`` is any python callable, such as a function or a lambda + expression. It will be called repeatedly (with a delay of ``interval_secs`` + seconds between successive calls) until it succeeds (that is, it returns a + truthy value) or until ``timeout_secs`` seconds have passed. In both cases, + ``wait_until`` returns the value that ``callable_`` returns. + + After you send a remote-control signal to the system-under-test it usually + takes a few frames to react, so a test script like this would probably + fail:: + + press("KEY_EPG") + assert match("guide.png") + + Instead, use this:: + + press("KEY_EPG") + assert wait_until(lambda: match("guide.png")) + + Note that instead of the above `assert wait_until(...)` you could use + `wait_for_match("guide.png")`. `wait_until` is a generic solution that + also works with stbt's other functions, like `match_text` and + `is_screen_black`. + + `wait_until` allows composing more complex conditions, such as:: + + # Wait until something disappears: + assert wait_until(lambda: not match("xyz.png")) + + # Assert that something doesn't appear within 10 seconds: + assert not wait_until(lambda: match("xyz.png")) + + # Assert that two images are present at the same time: + assert wait_until(lambda: match("a.png") and match("b.png")) + + # Wait but don't raise an exception: + if not wait_until(lambda: match("xyz.png")): + do_something_else() + + There are some drawbacks to using `assert` instead of `wait_for_match`: + + * The exception message won't contain the reason why the match failed + (unless you specify it as a second parameter to `assert`, which is + tedious and we don't expect you to do it), and + * The exception won't have the offending video-frame attached (so the + screenshot in the test-run artifacts will be a few frames later than the + frame that actually caused the test to fail). + + We hope to solve both of the above drawbacks at some point in the future. + """ + expiry_time = time.time() + timeout_secs + while True: + e = callable_() + if e: + return e + if time.time() > expiry_time: + debug("wait_until timed out: %s" % _callable_description(callable_)) + return e + time.sleep(interval_secs) + + +def _callable_description(callable_): + """Helper to provide nicer debug output when `wait_until` fails. + + >>> _callable_description(wait_until) + 'wait_until' + >>> _callable_description( + ... lambda: stbt.press("OK")) + ' lambda: stbt.press("OK"))\\n' + """ + d = callable_.__name__ + if d == "": + try: + d = inspect.getsource(callable_) + except IOError: + pass + return d + + +@contextmanager +def as_precondition(message): + """Context manager that replaces test failures with test errors. + + Stb-tester's reports show test failures (that is, `UITestFailure` or + `AssertionError` exceptions) as red results, and test errors (that is, + unhandled exceptions of any other type) as yellow results. Note that + `wait_for_match`, `wait_for_motion`, and similar functions raise a + `UITestFailure` when they detect a failure. By running such functions + inside an `as_precondition` context, any `UITestFailure` or + `AssertionError` exceptions they raise will be caught, and a + `PreconditionError` will be raised instead. + + When running a single testcase hundreds or thousands of times to reproduce + an intermittent defect, it is helpful to mark unrelated failures as test + errors (yellow) rather than test failures (red), so that you can focus on + diagnosing the failures that are most likely to be the particular defect + you are looking for. For more details see `Test failures vs. errors + `_. + + :param str message: + A description of the precondition. Word this positively: "Channels + tuned", not "Failed to tune channels". + + :raises: + `PreconditionError` if the wrapped code block raises a `UITestFailure` + or `AssertionError`. + + Example:: + + def test_that_the_on_screen_id_is_shown_after_booting(): + channel = 100 + + with stbt.as_precondition("Tuned to channel %s" % channel): + mainmenu.close_any_open_menu() + channels.goto_channel(channel) + power.cold_reboot() + assert channels.is_on_channel(channel) + + stbt.wait_for_match("on-screen-id.png") + + """ + try: + yield + except (UITestFailure, AssertionError) as e: + debug("stbt.as_precondition caught a %s exception and will " + "re-raise it as PreconditionError.\nOriginal exception was:\n%s" + % (type(e).__name__, traceback.format_exc(e))) + exc = PreconditionError(message, e) + if hasattr(e, 'screenshot'): + exc.screenshot = e.screenshot # pylint: disable=W0201 + raise exc + + +class UITestError(Exception): + """The test script had an unrecoverable error.""" + pass + + +class UITestFailure(Exception): + """The test failed because the system under test didn't behave as expected. + + Inherit from this if you need to define your own test-failure exceptions. + """ + pass + + +class NoVideo(UITestFailure): + """No video available from the source pipeline.""" + pass + + +class MatchTimeout(UITestFailure): + """Exception raised by `wait_for_match`. + + * ``screenshot``: The last video frame that `wait_for_match` checked before + timing out. + * ``expected``: Filename of the image that was being searched for. + * ``timeout_secs``: Number of seconds that the image was searched for. + """ + def __init__(self, screenshot, expected, timeout_secs): + super(MatchTimeout, self).__init__() + self.screenshot = screenshot + self.expected = expected + self.timeout_secs = timeout_secs + + def __str__(self): + return "Didn't find match for '%s' within %g seconds." % ( + self.expected, self.timeout_secs) + + +class MotionTimeout(UITestFailure): + """Exception raised by `wait_for_motion`. + + * ``screenshot``: The last video frame that `wait_for_motion` checked before + timing out. + * ``mask``: Filename of the mask that was used, if any. + * ``timeout_secs``: Number of seconds that motion was searched for. + """ + def __init__(self, screenshot, mask, timeout_secs): + super(MotionTimeout, self).__init__() + self.screenshot = screenshot + self.mask = mask + self.timeout_secs = timeout_secs + + def __str__(self): + return "Didn't find motion%s within %g seconds." % ( + " (with mask '%s')" % self.mask if self.mask else "", + self.timeout_secs) + + +class PreconditionError(UITestError): + """Exception raised by `as_precondition`.""" + def __init__(self, message, original_exception): + super(PreconditionError, self).__init__() + self.message = message + self.original_exception = original_exception + + def __str__(self): + return ( + "Didn't meet precondition '%s' (original exception was: %s)" + % (self.message, self.original_exception)) + + +# stbt-run initialisation and convenience functions +# (you will need these if writing your own version of stbt-run) +# =========================================================================== + +def argparser(): + parser = argparse.ArgumentParser() + parser.add_argument( + '--control', + default=get_config('global', 'control'), + help='The remote control to control the stb (default: %(default)s)') + parser.add_argument( + '--source-pipeline', + default=get_config('global', 'source_pipeline'), + help='A gstreamer pipeline to use for A/V input (default: ' + '%(default)s)') + parser.add_argument( + '--sink-pipeline', + default=get_config('global', 'sink_pipeline'), + help='A gstreamer pipeline to use for video output ' + '(default: %(default)s)') + parser.add_argument( + '--restart-source', action='store_true', + default=(get_config('global', 'restart_source').lower() in + ("1", "yes", "true", "on")), + help='Restart the GStreamer source pipeline when video loss is ' + 'detected') + + logging.argparser_add_verbose_argument(parser) + + return parser + + +# Internal +# =========================================================================== + +if hasattr(GLib.MainLoop, 'new'): + _mainloop = GLib.MainLoop.new(context=None, is_running=False) +else: + # Ubuntu 12.04 (Travis) support: PyGObject <3.7.2 doesn't expose the "new" + # constructor we'd like to be using, so fall back to __init__. This means + # Ctrl-C is broken on 12.04 and threading will behave differently on Travis + # than on our supported systems. + _mainloop = GLib.MainLoop() + + +def _gst_sample_make_writable(sample): + if sample.get_buffer().mini_object.is_writable(): + return sample + else: + return Gst.Sample.new( + sample.get_buffer().copy_region( + Gst.BufferCopyFlags.FLAGS | Gst.BufferCopyFlags.TIMESTAMPS | + Gst.BufferCopyFlags.META | Gst.BufferCopyFlags.MEMORY, 0, + sample.get_buffer().get_size()), + sample.get_caps(), + sample.get_segment(), + sample.get_info()) + + +@contextmanager +def _numpy_from_sample(sample, readonly=False): + """ + Allow the contents of a GstSample to be read (and optionally changed) as a + numpy array. The provided numpy array is a view onto the contents of the + GstBuffer in the sample provided. The data is only valid within the `with:` + block where this contextmanager is used so the provided array should not + be referenced outside the `with:` block. If you want to use it elsewhere + either copy the data with `numpy.ndarray.copy()` or reference the GstSample + directly. + + A `numpy.ndarray` may be passed as sample, in which case this + contextmanager is a no-op. This makes it easier to create functions which + will accept either numpy arrays or GstSamples providing a migration path + for reducing copying in stb-tester. + + :param sample: Either a GstSample or a `numpy.ndarray` containing the data + you wish to manipulate as a `numpy.ndarray` + :param readonly: bool. Determines whether you want to just read or change + the data contained within sample. If True the GstSample + passed must be writeable or ValueError will be raised. + Use `stbt.gst_sample_make_writable` to get a writable + `GstSample`. + + >>> s = Gst.Sample.new(Gst.Buffer.new_wrapped("hello"), + ... Gst.Caps.from_string("video/x-raw"), None, None) + >>> with _numpy_from_sample(s) as a: + ... print a + [104 101 108 108 111] + """ + if isinstance(sample, numpy.ndarray): + yield sample + return + if not isinstance(sample, Gst.Sample): + raise TypeError("numpy_from_gstsample must take a Gst.Sample or a " + "numpy.ndarray. Received a %s" % str(type(sample))) + + caps = sample.get_caps() + flags = Gst.MapFlags.READ + if not readonly: + flags |= Gst.MapFlags.WRITE + + with map_gst_buffer(sample.get_buffer(), flags) as buf: + array = numpy.frombuffer((buf), dtype=numpy.uint8) + array.flags.writeable = not readonly + if caps.get_structure(0).get_value('format') in ['BGR', 'RGB']: + array.shape = (caps.get_structure(0).get_value('height'), + caps.get_structure(0).get_value('width'), + 3) + yield array + + +def _test_that_mapping_a_sample_readonly_gives_a_readonly_array(): + Gst.init([]) + s = Gst.Sample.new(Gst.Buffer.new_wrapped("hello"), + Gst.Caps.from_string("video/x-raw"), None, None) + with _numpy_from_sample(s, readonly=True) as ro: + try: + ro[0] = 3 + assert False, 'Writing elements should have thrown' + except (ValueError, RuntimeError): + # Different versions of numpy raise different exceptions + pass + + +def _test_passing_a_numpy_ndarray_as_sample_is_a_noop(): + a = numpy.ndarray((5, 2)) + with _numpy_from_sample(a) as m: + assert a is m + + +def _test_that_dimensions_of_array_are_according_to_caps(): + s = Gst.Sample.new(Gst.Buffer.new_wrapped( + "row 1 4 px row 2 4 px row 3 4 px "), + Gst.Caps.from_string("video/x-raw,format=BGR,width=4,height=3"), + None, None) + with _numpy_from_sample(s, readonly=True) as a: + assert a.shape == (3, 4, 3) + + +def _get_frame_timestamp(frame): + if isinstance(frame, Gst.Sample): + return frame.get_buffer().pts + else: + return None + + +class Display(object): + def __init__(self, user_source_pipeline, user_sink_pipeline, + save_video, + restart_source=False, + transformation_pipeline='identity'): + self.novideo = False + self.lock = threading.RLock() # Held by whoever is consuming frames + self.last_sample = Queue.Queue(maxsize=1) + self.source_pipeline = None + self.start_timestamp = None + self.underrun_timeout = None + self.tearing_down = False + + self.annotations_lock = threading.Lock() + self.text_annotations = [] + self.match_annotations = [] + + self.restart_source_enabled = restart_source + + appsink = ( + "appsink name=appsink max-buffers=1 drop=false sync=true " + "emit-signals=true " + "caps=video/x-raw,format=BGR") + # Notes on the source pipeline: + # * _stbt_raw_frames_queue is kept small to reduce the amount of slack + # (and thus the latency) of the pipeline. + # * _stbt_user_data_queue before the decodebin is large. We don't want + # to drop encoded packets as this will cause significant image + # artifacts in the decoded buffers. We make the assumption that we + # have enough horse-power to decode the incoming stream and any delays + # will be transient otherwise it could start filling up causing + # increased latency. + self.source_pipeline_description = " ! ".join([ + user_source_pipeline, + 'queue name=_stbt_user_data_queue max-size-buffers=0 ' + ' max-size-bytes=0 max-size-time=10000000000', + "decodebin", + 'queue name=_stbt_raw_frames_queue max-size-buffers=2', + 'videoconvert', + 'video/x-raw,format=BGR', + transformation_pipeline, + appsink]) + self.create_source_pipeline() + + if save_video: + if not save_video.endswith(".webm"): + save_video += ".webm" + debug("Saving video to '%s'" % save_video) + video_pipeline = ( + "t. ! queue leaky=downstream ! videoconvert ! " + "vp8enc cpu-used=6 min_quantizer=32 max_quantizer=32 ! " + "webmmux ! filesink location=%s" % save_video) + else: + video_pipeline = "" + + sink_pipeline_description = " ".join([ + "appsrc name=appsrc format=time " + + "caps=video/x-raw,format=(string)BGR !", + "tee name=t", + video_pipeline, + "t. ! queue leaky=downstream ! videoconvert !", + user_sink_pipeline + ]) + + self.sink_pipeline = Gst.parse_launch(sink_pipeline_description) + sink_bus = self.sink_pipeline.get_bus() + sink_bus.connect( + "message::error", + lambda bus, msg: self.on_error(self.sink_pipeline, bus, msg)) + sink_bus.connect("message::warning", self.on_warning) + sink_bus.connect("message::eos", self.on_eos_from_sink_pipeline) + sink_bus.add_signal_watch() + self.appsrc = self.sink_pipeline.get_by_name("appsrc") + + debug("source pipeline: %s" % self.source_pipeline_description) + debug("sink pipeline: %s" % sink_pipeline_description) + + self.mainloop_thread = threading.Thread(target=_mainloop.run) + self.mainloop_thread.daemon = True + + def create_source_pipeline(self): + self.source_pipeline = Gst.parse_launch( + self.source_pipeline_description) + source_bus = self.source_pipeline.get_bus() + source_bus.connect( + "message::error", + lambda bus, msg: self.on_error(self.source_pipeline, bus, msg)) + source_bus.connect("message::warning", self.on_warning) + source_bus.connect("message::eos", self.on_eos_from_source_pipeline) + source_bus.add_signal_watch() + appsink = self.source_pipeline.get_by_name("appsink") + appsink.connect("new-sample", self.on_new_sample) + + if self.restart_source_enabled: + # Handle loss of video (but without end-of-stream event) from the + # Hauppauge HDPVR capture device. + source_queue = self.source_pipeline.get_by_name( + "_stbt_user_data_queue") + self.start_timestamp = None + source_queue.connect("underrun", self.on_underrun) + source_queue.connect("running", self.on_running) + + def set_source_pipeline_playing(self): + if (self.source_pipeline.set_state(Gst.State.PAUSED) + == Gst.StateChangeReturn.NO_PREROLL): + # This is a live source, drop frames if we get behind + self.source_pipeline.get_by_name('_stbt_raw_frames_queue') \ + .set_property('leaky', 'downstream') + self.source_pipeline.get_by_name('appsink') \ + .set_property('sync', False) + + self.source_pipeline.set_state(Gst.State.PLAYING) + + def get_sample(self, timeout_secs=10): + try: + # Timeout in case no frames are received. This happens when the + # Hauppauge HDPVR video-capture device loses video. + gst_sample = self.last_sample.get(timeout=timeout_secs) + self.novideo = False + except Queue.Empty: + self.novideo = True + pipeline = self.source_pipeline + if pipeline: + Gst.debug_bin_to_dot_file_with_ts( + pipeline, Gst.DebugGraphDetails.ALL, "NoVideo") + raise NoVideo("No video") + if isinstance(gst_sample, Exception): + raise UITestError(str(gst_sample)) + + return gst_sample + + def frames(self, timeout_secs=None): + for sample in self.gst_samples(timeout_secs=timeout_secs): + with _numpy_from_sample(sample, readonly=True) as frame: + copy = frame.copy() + yield (copy, sample.get_buffer().pts) + + def gst_samples(self, timeout_secs=None): + self.start_timestamp = None + + with self.lock: + while True: + ddebug("user thread: Getting sample at %s" % time.time()) + sample = self.get_sample(max(10, timeout_secs)) + ddebug("user thread: Got sample at %s" % time.time()) + timestamp = sample.get_buffer().pts + + if timeout_secs is not None: + if not self.start_timestamp: + self.start_timestamp = timestamp + if (timestamp - self.start_timestamp > + timeout_secs * Gst.SECOND): + debug("timed out: %d - %d > %d" % ( + timestamp, self.start_timestamp, + timeout_secs * Gst.SECOND)) + return + + sample = _gst_sample_make_writable(sample) + try: + yield sample + finally: + self.push_sample(sample) + + def on_new_sample(self, appsink): + sample = appsink.emit("pull-sample") + self.tell_user_thread(sample) + if self.lock.acquire(False): # non-blocking + try: + self.push_sample(sample) + finally: + self.lock.release() + return Gst.FlowReturn.OK + + def tell_user_thread(self, sample_or_exception): + # `self.last_sample` (a synchronised Queue) is how we communicate from + # this thread (the GLib main loop) to the main application thread + # running the user's script. Note that only this thread writes to the + # Queue. + + if isinstance(sample_or_exception, Exception): + ddebug("glib thread: reporting exception to user thread: %s" % + sample_or_exception) + else: + ddebug("glib thread: new sample (timestamp=%s). Queue.qsize: %d" % + (sample_or_exception.get_buffer().pts, + self.last_sample.qsize())) + + # Drop old frame + try: + self.last_sample.get_nowait() + except Queue.Empty: + pass + + self.last_sample.put_nowait(sample_or_exception) + + def draw(self, obj, duration_secs): + with self.annotations_lock: + if type(obj) in (str, unicode): + obj = ( + datetime.datetime.now().strftime("%H:%M:%S.%f")[:-4] + + ' ' + obj) + self.text_annotations.append( + {"text": obj, "duration": duration_secs * Gst.SECOND}) + elif type(obj) is MatchResult: + if obj.timestamp is not None: + self.match_annotations.append(obj) + else: + raise TypeError( + "Can't draw object of type '%s'" % type(obj).__name__) + + def push_sample(self, sample): + # Calculate whether we need to draw any annotations on the output video. + now = sample.get_buffer().pts + texts = [] + matches = [] + with self.annotations_lock: + for x in self.text_annotations: + x.setdefault('start_time', now) + if now < x['start_time'] + x['duration']: + texts.append(x) + self.text_annotations = texts[:] + for match_result in list(self.match_annotations): + if match_result.timestamp == now: + matches.append(match_result) + if now >= match_result.timestamp: + self.match_annotations.remove(match_result) + + sample = _gst_sample_make_writable(sample) + with _numpy_from_sample(sample) as img: + _draw_text( + img, datetime.datetime.now().strftime("%H:%M:%S.%f")[:-4], + (10, 30), (255, 255, 255)) + for i, x in enumerate(reversed(texts)): + origin = (10, (i + 2) * 30) + age = float(now - x['start_time']) / (3 * Gst.SECOND) + color = (int(255 * max([1 - age, 0.5])),) * 3 + _draw_text(img, x['text'], origin, color) + for match_result in matches: + _draw_match(img, match_result.region, match_result.match) + + self.appsrc.props.caps = sample.get_caps() + self.appsrc.emit("push-buffer", sample.get_buffer()) + + def on_error(self, pipeline, _bus, message): + assert message.type == Gst.MessageType.ERROR + Gst.debug_bin_to_dot_file_with_ts( + pipeline, Gst.DebugGraphDetails.ALL, "ERROR") + err, dbg = message.parse_error() + self.tell_user_thread( + UITestError("%s: %s\n%s\n" % (err, err.message, dbg))) + _mainloop.quit() + + def on_warning(self, _bus, message): + assert message.type == Gst.MessageType.WARNING + Gst.debug_bin_to_dot_file_with_ts( + self.source_pipeline, Gst.DebugGraphDetails.ALL, "WARNING") + err, dbg = message.parse_warning() + warn("Warning: %s: %s\n%s\n" % (err, err.message, dbg)) + + def on_eos_from_source_pipeline(self, _bus, _message): + if not self.tearing_down: + warn("Got EOS from source pipeline") + self.restart_source() + + def on_eos_from_sink_pipeline(self, _bus, _message): + debug("Got EOS") + _mainloop.quit() + + def on_underrun(self, _element): + if self.underrun_timeout: + ddebug("underrun: I already saw a recent underrun; ignoring") + else: + ddebug("underrun: scheduling 'restart_source' in 2s") + self.underrun_timeout = GObjectTimeout(2, self.restart_source) + self.underrun_timeout.start() + + def on_running(self, _element): + if self.underrun_timeout: + ddebug("running: cancelling underrun timer") + self.underrun_timeout.cancel() + self.underrun_timeout = None + else: + ddebug("running: no outstanding underrun timers; ignoring") + + def restart_source(self, *_args): + warn("Attempting to recover from video loss: " + "Stopping source pipeline and waiting 5s...") + self.source_pipeline.set_state(Gst.State.NULL) + self.source_pipeline = None + GObjectTimeout(5, self.start_source).start() + return False # stop the timeout from running again + + def start_source(self): + if self.tearing_down: + return False + warn("Restarting source pipeline...") + self.create_source_pipeline() + self.set_source_pipeline_playing() + warn("Restarted source pipeline") + if self.restart_source_enabled: + self.underrun_timeout.start() + return False # stop the timeout from running again + + @staticmethod + def appsink_await_eos(appsink, timeout=None): + done = threading.Event() + + def on_eos(_appsink): + done.set() + return True + hid = appsink.connect('eos', on_eos) + d = appsink.get_property('eos') or done.wait(timeout) + appsink.disconnect(hid) + return d + + def startup(self): + self.set_source_pipeline_playing() + self.sink_pipeline.set_state(Gst.State.PLAYING) + + self.mainloop_thread.start() + + def teardown(self): + self.tearing_down = True + self.source_pipeline, source = None, self.source_pipeline + if source: + for elem in gst_iterate(source.iterate_sources()): + elem.send_event(Gst.Event.new_eos()) # pylint: disable=E1120 + if not self.appsink_await_eos( + source.get_by_name('appsink'), timeout=10): + debug("teardown: Source pipeline did not teardown gracefully") + source.set_state(Gst.State.NULL) + source = None + if not self.novideo: + debug("teardown: Sending eos") + self.appsrc.emit("end-of-stream") + self.mainloop_thread.join(10) + debug("teardown: Exiting (GLib mainloop %s)" % ( + "is still alive!" if self.mainloop_thread.isAlive() else "ok")) + + +def _draw_text(numpy_image, text, origin, color): + (width, height), _ = cv2.getTextSize( + text, fontFace=cv2.FONT_HERSHEY_DUPLEX, fontScale=1.0, thickness=1) + cv2.rectangle( + numpy_image, (origin[0] - 2, origin[1] + 2), + (origin[0] + width + 2, origin[1] - height - 2), + thickness=cv2.cv.CV_FILLED, color=(0, 0, 0)) + cv2.putText( + numpy_image, text, origin, cv2.FONT_HERSHEY_DUPLEX, fontScale=1.0, + color=color) + + +def _draw_match(numpy_image, region, match_, thickness=3): + cv2.rectangle( + numpy_image, (region.x, region.y), (region.right, region.bottom), + (32, 0 if match_ else 255, 255), # bgr + thickness=thickness) + + +class GObjectTimeout(object): + """Responsible for setting a timeout in the GTK main loop.""" + def __init__(self, timeout_secs, handler, *args): + self.timeout_secs = timeout_secs + self.handler = handler + self.args = args + self.timeout_id = None + + def start(self): + self.timeout_id = GObject.timeout_add( + self.timeout_secs * 1000, self.handler, *self.args) + + def cancel(self): + if self.timeout_id: + GObject.source_remove(self.timeout_id) + self.timeout_id = None + + +_BGR_CAPS = Gst.Caps.from_string('video/x-raw,format=BGR') + + +def _match(image, template, match_parameters, template_name): + if any(image.shape[x] < template.shape[x] for x in (0, 1)): + raise ValueError("Source image must be larger than template image") + if any(template.shape[x] < 1 for x in (0, 1)): + raise ValueError("Template image must contain some data") + if template.shape[2] != 3: + raise ValueError("Template image must be 3 channel BGR") + if template.dtype != numpy.uint8: + raise ValueError("Template image must be 8-bits per channel") + + first_pass_matched, position, first_pass_certainty = _find_match( + image, template, match_parameters) + matched = ( + first_pass_matched and + _confirm_match(image, position, template, match_parameters)) + + region = Region(position.x, position.y, + template.shape[1], template.shape[0]) + + if logging.get_debug_level() > 1: + source_with_roi = image.copy() + _draw_match(source_with_roi, region, first_pass_matched, thickness=1) + _log_image( + source_with_roi, "source_with_roi", "stbt-debug/detect_match") + _log_image_descriptions( + template_name, matched, position, + first_pass_matched, first_pass_certainty, match_parameters) + + return matched, region, first_pass_certainty + + +def _find_match(image, template, match_parameters): + """Search for `template` in the entire `image`. + + This searches the entire image, so speed is more important than accuracy. + False positives are ok; we apply a second pass (`_confirm_match`) to weed + out false positives. + + http://docs.opencv.org/modules/imgproc/doc/object_detection.html + http://opencv-code.com/tutorials/fast-template-matching-with-image-pyramid + """ + + log = functools.partial(_log_image, directory="stbt-debug/detect_match") + log(image, "source") + log(template, "template") + ddebug("Original image %s, template %s" % (image.shape, template.shape)) + + levels = get_config("match", "pyramid_levels", type_=int) + if levels <= 0: + raise ConfigurationError("'match.pyramid_levels' must be > 0") + template_pyramid = _build_pyramid(template, levels) + image_pyramid = _build_pyramid(image, len(template_pyramid)) + roi_mask = None # Initial region of interest: The whole image. + + for level in reversed(range(len(template_pyramid))): + + matched, best_match_position, certainty, roi_mask = _match_template( + image_pyramid[level], template_pyramid[level], match_parameters, + roi_mask, level) + + if level == 0 or not matched: + return matched, _upsample(best_match_position, level), certainty + + +def _match_template(image, template, match_parameters, roi_mask, level): + + log = functools.partial(_log_image, directory="stbt-debug/detect_match") + log_prefix = "level%d-" % level + ddebug("Level %d: image %s, template %s" % ( + level, image.shape, template.shape)) + + method = { + 'sqdiff-normed': cv2.TM_SQDIFF_NORMED, + 'ccorr-normed': cv2.TM_CCORR_NORMED, + 'ccoeff-normed': cv2.TM_CCOEFF_NORMED, + }[match_parameters.match_method] + threshold = max( + 0, + match_parameters.match_threshold - (0.2 if level > 0 else 0)) + + matches_heatmap = ( + (numpy.ones if method == cv2.TM_SQDIFF_NORMED else numpy.zeros)( + (image.shape[0] - template.shape[0] + 1, + image.shape[1] - template.shape[1] + 1), + dtype=numpy.float32)) + + if roi_mask is None or any(x < 3 for x in roi_mask.shape): + rois = [ # Initial region of interest: The whole image. + _Rect(0, 0, matches_heatmap.shape[1], matches_heatmap.shape[0])] + else: + roi_mask = cv2.pyrUp(roi_mask) + log(roi_mask, log_prefix + "roi_mask") + contours, _ = cv2.findContours( + roi_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) + rois = [ + _Rect(*cv2.boundingRect(x)) + # findContours ignores 1-pixel border of the image + .shift(Position(-1, -1)).expand(_Size(2, 2)) + for x in contours] + + if logging.get_debug_level() > 1: + source_with_rois = image.copy() + for roi in rois: + r = roi + t = _Size(*template.shape[:2]) + s = _Size(*source_with_rois.shape[:2]) + cv2.rectangle( + source_with_rois, + (max(0, r.x), max(0, r.y)), + (min(s.w - 1, r.x + r.w + t.w - 1), + min(s.h - 1, r.y + r.h + t.h - 1)), + (0, 255, 255), + thickness=1) + log(source_with_rois, log_prefix + "source_with_rois") + + for roi in rois: + r = roi.expand(_Size(*template.shape[:2])).shrink(_Size(1, 1)) + ddebug("Level %d: Searching in %s" % (level, roi)) + cv2.matchTemplate( + image[r.to_slice()], + template, + method, + matches_heatmap[roi.to_slice()]) + + log(image, log_prefix + "source") + log(template, log_prefix + "template") + log(matches_heatmap, log_prefix + "source_matchtemplate") + + min_value, max_value, min_location, max_location = cv2.minMaxLoc( + matches_heatmap) + if method == cv2.TM_SQDIFF_NORMED: + certainty = (1 - min_value) + best_match_position = Position(*min_location) + elif method in (cv2.TM_CCORR_NORMED, cv2.TM_CCOEFF_NORMED): + certainty = max_value + best_match_position = Position(*max_location) + else: + raise ValueError("Invalid matchTemplate method '%s'" % method) + + _, new_roi_mask = cv2.threshold( + matches_heatmap, + ((1 - threshold) if method == cv2.TM_SQDIFF_NORMED else threshold), + 255, + (cv2.THRESH_BINARY_INV if method == cv2.TM_SQDIFF_NORMED + else cv2.THRESH_BINARY)) + new_roi_mask = new_roi_mask.astype(numpy.uint8) + log(new_roi_mask, log_prefix + "source_matchtemplate_threshold") + + matched = certainty >= threshold + ddebug("Level %d: %s at %s with certainty %s" % ( + level, "Matched" if matched else "Didn't match", + best_match_position, certainty)) + return (matched, best_match_position, certainty, new_roi_mask) + + +def _build_pyramid(image, levels): + """A "pyramid" is [an image, the same image at 1/2 the size, at 1/4, ...] + + As a performance optimisation, image processing algorithms work on a + "pyramid" by first identifying regions of interest (ROIs) in the smallest + image; if results are positive, they proceed to the next larger image, etc. + See http://docs.opencv.org/doc/tutorials/imgproc/pyramids/pyramids.html + + The original-sized image is called "level 0", the next smaller image "level + 1", and so on. This numbering corresponds to the array index of the + "pyramid" array. + """ + pyramid = [image] + for _ in range(levels - 1): + if any(x < 20 for x in pyramid[-1].shape[:2]): + break + pyramid.append(cv2.pyrDown(pyramid[-1])) + return pyramid + + +def _upsample(position, levels): + """Convert position coordinates by the given number of pyramid levels. + + There is a loss of precision (unless ``levels`` is 0, in which case this + function is a no-op). + """ + return Position(position.x * 2 ** levels, position.y * 2 ** levels) + + +# Order of parameters consistent with ``cv2.boudingRect``. +class _Rect(namedtuple("_Rect", "x y w h")): + def expand(self, size): + return _Rect(self.x, self.y, self.w + size.w, self.h + size.h) + + def shrink(self, size): + return _Rect(self.x, self.y, self.w - size.w, self.h - size.h) + + def shift(self, position): + return _Rect(self.x + position.x, self.y + position.y, self.w, self.h) + + def to_slice(self): + """Return a 2-dimensional slice suitable for indexing a numpy array.""" + return (slice(self.y, self.y + self.h), slice(self.x, self.x + self.w)) + + +# Order of parameters consistent with OpenCV's ``numpy.ndarray.shape``. +class _Size(namedtuple("_Size", "h w")): + pass + + +def _confirm_match(image, position, template, match_parameters): + """Confirm that `template` matches `image` at `position`. + + This only checks `template` at a single position within `image`, so we can + afford to do more computationally-intensive checks than `_find_match`. + """ + + if match_parameters.confirm_method == "none": + return True + + log = functools.partial(_log_image, directory="stbt-debug/detect_match") + + # Set Region Of Interest to the "best match" location + roi = image[ + position.y:(position.y + template.shape[0]), + position.x:(position.x + template.shape[1])] + image_gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) + template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY) + log(roi, "confirm-source_roi") + log(image_gray, "confirm-source_roi_gray") + log(template_gray, "confirm-template_gray") + + if match_parameters.confirm_method == "normed-absdiff": + cv2.normalize(image_gray, image_gray, 0, 255, cv2.NORM_MINMAX) + cv2.normalize(template_gray, template_gray, 0, 255, cv2.NORM_MINMAX) + log(image_gray, "confirm-source_roi_gray_normalized") + log(template_gray, "confirm-template_gray_normalized") + + absdiff = cv2.absdiff(image_gray, template_gray) + _, thresholded = cv2.threshold( + absdiff, int(match_parameters.confirm_threshold * 255), + 255, cv2.THRESH_BINARY) + eroded = cv2.erode( + thresholded, + cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)), + iterations=match_parameters.erode_passes) + log(absdiff, "confirm-absdiff") + log(thresholded, "confirm-absdiff_threshold") + log(eroded, "confirm-absdiff_threshold_erode") + + return cv2.countNonZero(eroded) == 0 + + +_frame_number = 0 + + +def _log_image(image, name, directory): + if logging.get_debug_level() <= 1: + return + global _frame_number + if name == "source": + _frame_number += 1 + d = os.path.join(directory, "%05d" % _frame_number) + try: + utils.mkdir_p(d) + except OSError: + warn("Failed to create directory '%s'; won't save debug images." % d) + return + with _numpy_from_sample(image, readonly=True) as img: + if img.dtype == numpy.float32: + img = cv2.convertScaleAbs(img, alpha=255) + cv2.imwrite(os.path.join(d, name) + ".png", img) + + +def _log_image_descriptions( + template_name, matched, position, + first_pass_matched, first_pass_certainty, match_parameters): + """Create html file that describes the debug images.""" + + try: + import jinja2 + except ImportError: + warn( + "Not generating html guide to the image-processing debug images, " + "because python 'jinja2' module is not installed.") + return + + d = os.path.join("stbt-debug/detect_match", "%05d" % _frame_number) + + template = jinja2.Template(""" + + + + + + + +
+

+ {{template_name}} + {{"matched" if matched else "didn't match"}} +

+ +

Searching for template {{link("template")}} + within source {{link("source")}} image. + + {% for level in levels %} + +

At level {{level}}: +

    +
  • Searching for template {{link("template", level)}} + within source regions of interest + {{link("source_with_rois", level)}}. +
  • OpenCV matchTemplate result + {{link("source_matchtemplate", level)}} + with method {{match_parameters.match_method}} + ({{"darkest" if match_parameters.match_method == + "sqdiff-normed" else "lightest"}} + pixel indicates position of best match). +
  • matchTemplate result above match_threshold + {{link("source_matchtemplate_threshold", level)}} + of {{"%g"|format(match_parameters.match_threshold)}} + (white pixels indicate positions above the threshold). + + {% if (level == 0 and first_pass_matched) or level != min(levels) %} +
  • Matched at {{position}} {{link("source_with_roi")}} + with certainty {{"%.4f"|format(first_pass_certainty)}}. + {% else %} +
  • Didn't match (best match at {{position}} + {{link("source_with_roi")}} + with certainty {{"%.4f"|format(first_pass_certainty)}}). + {% endif %} + +
+ + {% endfor %} + + {% if first_pass_certainty >= match_parameters.match_threshold %} +

Second pass (confirmation): +

    +
  • Comparing template {{link("confirm-template_gray")}} + against source image's region of interest + {{link("confirm-source_roi_gray")}}. + + {% if match_parameters.confirm_method == "normed-absdiff" %} +
  • Normalised template + {{link("confirm-template_gray_normalized")}} + and source + {{link("confirm-source_roi_gray_normalized")}}. + {% endif %} + +
  • Absolute differences {{link("confirm-absdiff")}}. +
  • Differences above confirm_threshold + {{link("confirm-absdiff_threshold")}} + of {{"%.2f"|format(match_parameters.confirm_threshold)}}. +
  • After eroding + {{link("confirm-absdiff_threshold_erode")}} + {{match_parameters.erode_passes}} + {{"time" if match_parameters.erode_passes == 1 + else "times"}}. + {{"No" if matched else "Some"}} + differences (white pixels) remain, so the template + {{"does" if matched else "doesn't"}} match. +
+ {% endif %} + +

For further help please read + stb-tester + image matching parameters. + +

+ + + """) + + with open(os.path.join(d, "index.html"), "w") as f: + f.write(template.render( + first_pass_certainty=first_pass_certainty, + first_pass_matched=first_pass_matched, + levels=list(reversed(sorted(set( + [int(re.search(r"level(\d+)-.*", x).group(1)) + for x in glob.glob(os.path.join(d, "level*"))])))), + link=lambda s, level=None: ( + "" + .format("" if level is None else "level%d-" % level, s)), + match_parameters=match_parameters, + matched=matched, + min=min, + position=position, + template_name=template_name, + )) + + +def _find_path(image): + """Searches for the given filename and returns the full path. + + Searches in the directory of the script that called (for example) + detect_match, then in the directory of that script's caller, etc. + """ + + if os.path.isabs(image): + return image + + # stack()[0] is _find_path; + # stack()[1] is _find_path's caller, e.g. detect_match; + # stack()[2] is detect_match's caller (the user script). + for caller in inspect.stack()[2:]: + caller_image = os.path.join( + os.path.dirname(inspect.getframeinfo(caller[0]).filename), + image) + if os.path.isfile(caller_image): + return os.path.abspath(caller_image) + + # Fall back to image from cwd, for convenience of the selftests + return os.path.abspath(image) + + +def _load_mask(mask): + """Loads the given mask file and returns it as an OpenCV image.""" + mask_path = _find_path(mask) + debug("Using mask %s" % mask_path) + if not os.path.isfile(mask_path): + raise UITestError("No such mask file: %s" % mask) + mask_image = cv2.imread(mask_path, cv2.CV_LOAD_IMAGE_GRAYSCALE) + if mask_image is None: + raise UITestError("Failed to load mask file: %s" % mask_path) + return mask_image + + +# Tesseract sometimes has a hard job distinguishing certain glyphs such as +# ligatures and different forms of the same punctuation. We strip out this +# superfluous information improving matching accuracy with minimal effect on +# meaning. This means that stbt.ocr give much more consistent results. +_ocr_replacements = { + # Ligatures + u'ff': u'ff', + u'fi': u'fi', + u'fl': u'fl', + u'ffi': u'ffi', + u'ffl': u'ffl', + u'ſt': u'ft', + u'st': u'st', + # Punctuation + u'“': u'"', + u'”': u'"', + u'‘': u'\'', + u'’': u'\'', + # These are actually different glyphs!: + u'‐': u'-', + u'‑': u'-', + u'‒': u'-', + u'–': u'-', + u'—': u'-', + u'―': u'-', +} +_ocr_transtab = dict((ord(amb), to) for amb, to in _ocr_replacements.items()) + + +def _find_tessdata_dir(): + from distutils.spawn import find_executable + + tessdata_prefix = os.environ.get("TESSDATA_PREFIX", None) + if tessdata_prefix: + tessdata = tessdata_prefix + '/tessdata' + if os.path.exists(tessdata): + return tessdata + else: + raise RuntimeError('Invalid TESSDATA_PREFIX: %s' % tessdata_prefix) + + tess_prefix_share = os.path.normpath( + find_executable('tesseract') + '/../../share/') + for suffix in [ + '/tessdata', '/tesseract-ocr/tessdata', '/tesseract/tessdata']: + if os.path.exists(tess_prefix_share + suffix): + return tess_prefix_share + suffix + raise RuntimeError('Installation error: Cannot locate tessdata directory') + + +def _symlink_copy_dir(a, b): + """Behaves like `cp -rs` with GNU cp but is portable and doesn't require + execing another process. Tesseract requires files in the "tessdata" + directory to be modified to set config options. tessdata may be on a + read-only system directory so we use this to work around that limitation. + """ + from os.path import basename, join, relpath + newroot = join(b, basename(a)) + for dirpath, dirnames, filenames in os.walk(a): + for name in dirnames: + if name not in ['.', '..']: + rel = relpath(join(dirpath, name), a) + os.mkdir(join(newroot, rel)) + for name in filenames: + rel = relpath(join(dirpath, name), a) + os.symlink(join(a, rel), join(newroot, rel)) + +_memoise_tesseract_version = None + + +def _tesseract_version(output=None): + r"""Different versions of tesseract have different bugs. This function + allows us to tell the user if what they want isn't going to work. + + >>> (_tesseract_version('tesseract 3.03\n leptonica-1.70\n') > + ... _tesseract_version('tesseract 3.02\n')) + True + """ + global _memoise_tesseract_version + if output is None: + if _memoise_tesseract_version is None: + _memoise_tesseract_version = subprocess.check_output( + ['tesseract', '--version'], stderr=subprocess.STDOUT) + output = _memoise_tesseract_version + + line = [x for x in output.split('\n') if x.startswith('tesseract')][0] + return LooseVersion(line.split()[1]) + + +def _tesseract(frame, region, mode, lang, _config, + user_patterns=None, user_words=None): + + if _config is None: + _config = {} + + with _numpy_from_sample(frame, readonly=True) as f: + frame_region = Region(0, 0, f.shape[1], f.shape[0]) + intersection = Region.intersect(frame_region, region) + if intersection is None: + warn("Requested OCR in region %s which doesn't overlap with " + "the frame %s" % (str(region), frame_region)) + return ('', None) + else: + region = intersection + + # We scale image up 3x before feeding it to tesseract as this + # significantly reduces the error rate by more than 6x in tests. This + # uses bilinear interpolation which produces the best results. See + # http://stb-tester.com/blog/2014/04/14/improving-ocr-accuracy.html + outsize = (region.width * 3, region.height * 3) + subframe = cv2.resize(_crop(f, region), outsize, + interpolation=cv2.INTER_LINEAR) + + # $XDG_RUNTIME_DIR is likely to be on tmpfs: + tmpdir = os.environ.get("XDG_RUNTIME_DIR", None) + + # The second argument to tesseract is "output base" which is a filename to + # which tesseract will append an extension. Unfortunately this filename + # isn't easy to predict in advance across different versions of tesseract. + # If you give it "hello" the output will be written to "hello.txt", but in + # hOCR mode it will be "hello.html" (tesseract 3.02) or "hello.hocr" + # (tesseract 3.03). We work around this with a temporary directory: + with utils.named_temporary_directory(prefix='stbt-ocr-', dir=tmpdir) as tmp: + outdir = tmp + '/output' + os.mkdir(outdir) + + cmd = ["tesseract", '-l', lang, tmp + '/input.png', + outdir + '/output', "-psm", str(int(mode))] + + tessenv = os.environ.copy() + + if _config or user_words or user_patterns: + tessdata_dir = tmp + '/tessdata' + os.mkdir(tessdata_dir) + _symlink_copy_dir(_find_tessdata_dir(), tmp) + tessenv['TESSDATA_PREFIX'] = tmp + '/' + + if user_words: + if 'user_words_suffix' in _config: + raise ValueError( + "You cannot specify 'user_words' and " + + "'_config[\"user_words_suffix\"]' at the same time") + with open('%s/%s.user-words' % (tessdata_dir, lang), 'w') as f: + f.write('\n'.join(user_words).encode('utf-8')) + _config['user_words_suffix'] = 'user-words' + + if user_patterns: + if 'user_patterns_suffix' in _config: + raise ValueError( + "You cannot specify 'user_patterns' and " + + "'_config[\"user_patterns_suffix\"]' at the same time") + if _tesseract_version() < LooseVersion('3.03'): + raise RuntimeError( + 'tesseract version >=3.03 is required for user_patterns. ' + 'version %s is currently installed' % _tesseract_version()) + with open('%s/%s.user-patterns' % (tessdata_dir, lang), 'w') as f: + f.write('\n'.join(user_patterns).encode('utf-8')) + _config['user_patterns_suffix'] = 'user-patterns' + + if _config: + with open(tessdata_dir + '/configs/stbtester', 'w') as cfg: + for k, v in _config.iteritems(): + if isinstance(v, bool): + cfg.write(('%s %s\n' % (k, 'T' if v else 'F'))) + else: + cfg.write((u"%s %s\n" % (k, unicode(v))) + .encode('utf-8')) + cmd += ['stbtester'] + + cv2.imwrite(tmp + '/input.png', subframe) + subprocess.check_output(cmd, stderr=subprocess.STDOUT, env=tessenv) + with open(outdir + '/' + os.listdir(outdir)[0], 'r') as outfile: + return (outfile.read(), region) + + +def _hocr_iterate(hocr): + started = False + need_space = False + for elem in hocr.iterdescendants(): + if elem.tag == '{http://www.w3.org/1999/xhtml}p' and started: + yield (u'\n', elem) + need_space = False + if elem.tag == '{http://www.w3.org/1999/xhtml}span' and \ + 'ocr_line' in elem.get('class').split() and started: + yield (u'\n', elem) + need_space = False + for e, t in [(elem, elem.text), (elem.getparent(), elem.tail)]: + if t: + if t.strip(): + if need_space and started: + yield (u' ', None) + need_space = False + yield (unicode(t).strip(), e) + started = True + else: + need_space = True + + +def _hocr_find_phrase(hocr, phrase): + l = list(_hocr_iterate(hocr)) + words_only = [(w, elem) for w, elem in l if w.strip() != u''] + + # Dumb and poor algorithmic complexity but succint and simple + if len(phrase) <= len(words_only): + for x in range(0, len(words_only)): + sublist = words_only[x:x + len(phrase)] + if all(w[0].lower() == p.lower() for w, p in zip(sublist, phrase)): + return sublist + return None + + +def _hocr_elem_region(elem): + while elem is not None: + m = re.search(r'bbox (\d+) (\d+) (\d+) (\d+)', elem.get('title') or u'') + if m: + extents = [int(x) for x in m.groups()] + return Region.from_extents(*extents) + elem = elem.getparent() + +# Tests +# =========================================================================== + + +def test_wait_for_motion_half_motion_str_2of4(): + with _fake_frames_at_half_motion() as dut: + dut.wait_for_motion(consecutive_frames='2/4') + + +def test_wait_for_motion_half_motion_str_2of3(): + with _fake_frames_at_half_motion() as dut: + dut.wait_for_motion(consecutive_frames='2/3') + + +def test_wait_for_motion_half_motion_str_3of4(): + with _fake_frames_at_half_motion() as dut: + try: + dut.wait_for_motion(consecutive_frames='3/4') + assert False, "wait_for_motion succeeded unexpectedly" + except MotionTimeout: + pass + + +def test_wait_for_motion_half_motion_int(): + with _fake_frames_at_half_motion() as dut: + try: + dut.wait_for_motion(consecutive_frames=2) + assert False, "wait_for_motion succeeded unexpectedly" + except MotionTimeout: + pass + + +@contextmanager +def _fake_frames_at_half_motion(): + class FakeDisplay(object): + def gst_samples(self, _timeout_secs=10): + data = [ + numpy.zeros((2, 2, 3), dtype=numpy.uint8), + numpy.ones((2, 2, 3), dtype=numpy.uint8) * 255, + ] + for i in range(10): + buf = Gst.Buffer.new_wrapped(data[(i // 2) % 2].flatten()) + buf.pts = i * 1000000000 + yield _gst_sample_make_writable( + Gst.Sample.new(buf, Gst.Caps.from_string( + 'video/x-raw,format=BGR,width=2,height=2'), None, None)) + + dut = DeviceUnderTest(display=FakeDisplay()) + dut.get_frame = lambda: None + yield dut + + +def test_ocr_on_static_images(): + for image, expected_text, region, mode in [ + # pylint: disable=C0301 + ("Connection-status--white-on-dark-blue.png", "Connection status: Connected", None, None), + ("Connection-status--white-on-dark-blue.png", "Connected", Region(x=210, y=0, width=120, height=40), None), + ("programme--white-on-black.png", "programme", None, None), + ("UJJM--white-text-on-grey-boxes.png", "", None, None), + ("UJJM--white-text-on-grey-boxes.png", "UJJM", None, OcrMode.SINGLE_LINE), + ]: + kwargs = {"region": region} + if mode is not None: + kwargs["mode"] = mode + text = DeviceUnderTest().ocr( + cv2.imread(os.path.join( + os.path.dirname(__file__), "..", "tests", "ocr", image)), + **kwargs) + assert text == expected_text, ( + "Unexpected text. Expected '%s'. Got: %s" % (expected_text, text)) diff -Nru stb-tester-22/_stbt/logging.py stb-tester-23-1-gf70a21c/_stbt/logging.py --- stb-tester-22/_stbt/logging.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/logging.py 2015-07-08 17:05:05.000000000 +0000 @@ -62,6 +62,12 @@ help='Enable debug output (specify twice to enable GStreamer element ' 'dumps to ./stbt-debug directory)') + argparser.add_argument( + '--structured-logging', metavar="FILENAME", default=None, + help="Writes structed logging data to given filename. The format of " + "the data is newline delimited JSON objects with xz compression " + "applied") + def test_that_debug_can_write_unicode_strings(): def test(level): diff -Nru stb-tester-22/_stbt/power.py stb-tester-23-1-gf70a21c/_stbt/power.py --- stb-tester-22/_stbt/power.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/power.py 2015-07-08 17:05:05.000000000 +0000 @@ -3,6 +3,7 @@ import errno import os import re +import time from _stbt.config import ConfigurationError @@ -11,7 +12,8 @@ remotes = [ (r'none', _NoOutlet), (r'file:(?P[^:]+)', _FileOutlet), - (r'(?Ppdu|ipp|aten|testfallback):(?P[^: ]+)' + (r'aten:(?P[^: ]+):(?P[^: ]+)', _ATEN_PE6108G), + (r'(?Ppdu|ipp|testfallback):(?P[^: ]+)' ':(?P[^: ]+)', _ShellOutlet), (r'aviosys-8800-pro(:(?P[^:]+))?', _new_aviosys_8800_pro), ] @@ -191,3 +193,63 @@ self.respond('z>') return len(data) + + +class _ATEN_PE6108G(object): + """Class to control the ATEN PDU using pysnmp module. """ + + def __init__(self, hostname, outlet): + self.hostname = hostname + self.outlet = int(outlet) + oid_string = "1.3.6.1.4.1.21317.1.3.2.2.2.2.{0}.0" + self.outlet_oid = oid_string.format(self.outlet + 1) + + def set(self, power): + new_state = self.aten_cmd(power=power) + + # ATEN PE6108G outlets take between 4-8 seconds to power on + for _ in range(12): + time.sleep(1) + if self.aten_cmd() == new_state: + return + raise RuntimeError( + "Timeout waiting for outlet to power {}".format( + "ON" if power else "OFF")) + + def get(self): + result = self.aten_cmd() + # 3 represents moving between states + return {3: False, 2: True, 1: False}[int(result)] + + def aten_cmd(self, power=None): + from pysnmp.entity.rfc3413.oneliner import cmdgen + from pysnmp.proto.rfc1905 import NoSuchObject + from pysnmp.proto.rfc1902 import Integer + + command_generator = cmdgen.CommandGenerator() + + if power is None: # `status` command + error_ind, _, _, var_binds = command_generator.getCmd( + cmdgen.CommunityData('administrator'), + cmdgen.UdpTransportTarget((self.hostname, 161)), + self.outlet_oid) + else: + error_ind, _, _, var_binds = command_generator.setCmd( + cmdgen.CommunityData('administrator'), + cmdgen.UdpTransportTarget((self.hostname, 161)), + (self.outlet_oid, Integer(2 if power else 1))) + + if error_ind is not None: + raise RuntimeError("SNMP Error ({})".format(error_ind)) + + name, result = var_binds[0] + + assert str(name) == self.outlet_oid + + if isinstance(result, NoSuchObject): + raise RuntimeError("Invalid outlet {}".format(self.outlet)) + + if not isinstance(result, Integer): + raise RuntimeError("Unexpected result ({})".format(result)) + + return result diff -Nru stb-tester-22/_stbt/state_watch.py stb-tester-23-1-gf70a21c/_stbt/state_watch.py --- stb-tester-22/_stbt/state_watch.py 1970-01-01 00:00:00.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/state_watch.py 2015-07-08 17:05:05.000000000 +0000 @@ -0,0 +1,240 @@ +import datetime +import json +import os +import socket +import sys +from cStringIO import StringIO + + +class StateSender(object): + """ + A test run is in a particular state. This state includes what test is + currently executing, what line is currently executing, etc. This state + needs to be communicated live to the UI and should be useful during test + replay when investigating what has gone wrong. + + The data structure is like: + + { + "test_run": { + "current_line": { + "file": "tests/my_file.py", + "line": 123, + }, + "test_case": { + "name": "tests/my_file.py::test_that_this_rocks", + "file": "tests/my_file.py", + "function": "test_that_this_rocks, + "line": 87 + } + } + } + + Conceptually the data-structure lives in the test-pack. It is ephermal and + only exists while the test job is executing. Changes to this data structure + are serialised and sent over a socket to the UI so the current state of the + system can be displayed. The serialisation includes timestamps such that + the state of the system can be inspected and replayed as part of the test + result. + + The change serialisation format is \\r\\n seperated JSON dictionaries. + Here's an example entry pretty printed for clarity's sake (in reality it + would be on a single line): + + { + "state_change": { + "time": "2014-11-28T20:55:26.092343Z", + "changes": { + "test_job.current_line": { + "file": "tests/my_file.py", + "line": 654 + } + } + } + } + + This record indicates that the current line changed to line 654 of file + "tests/my_file.py" at 20:55:26.0992343 UTC on 2014-11-28. Notes: + + * The root object has a single key "state_change". This allows extensions + in the future for other types of messages to be sent over the same + protocol. + + * The state_change message has two keys: + + * "time" which is the time of the change as a ISO8601 formatted string + e.g `"2014-11-28T20:55:26.092343Z"`. + + * "changes" is a dictionary of changes made to the state of the system. + The keys identify the value that is changing, and the values are the + new value of the respective key. The key is dot seperated strings + identifying the subtree of the heirarical data structure that should + be replaced. e.g: + + "test_run.current_line": {"cow": "moo"} + + means: + + date["test_run"]["current_line"] = {"cow": "moo"} + + The fact that multiple values may be replaced in one message allows + atomic changes to happen to the hierarchy. + """ + def __init__(self, file_): + self._file = file_ + + def set(self, items, time=None): + """ + >>> sw = StateSender(StringIO()) + >>> sw.set({"animals.noises": {"cow": "moo"}}) + """ + if time is None: + time = datetime.datetime.now() + message = { + "state_change": { + "time": time.isoformat(), + "changes": items + } + } + self._file.write(json.dumps(message, sort_keys=True) + '\r\n') + + def close(self): + self._file.close() + self._file = None + + def log_test_starting(self, name, file_, function, line): + self.set({"test_run": { + "current_line": {}, + "test_case": { + "name": name, + "file": file_, + "function": function, + "line": line + }}}) + + def log_test_ended(self): + self.set({"test_run": {}}) + + def log_current_line(self, file_, line): + self.set({"test_run.current_line": {"file": file_, "line": line}}) + + +def test_state_changes(): + f = StringIO() + sw = StateSender(f) + sw.set({"test_run.line_number": 23}, + time=datetime.datetime(2014, 3, 4, 12, 45, 12)) + assert f.getvalue() == ( + '{"state_change": {"changes": {"test_run.line_number": 23}, ' + '"time": "2014-03-04T12:45:12"}}\r\n') + + +class _SocketAndFileWriter(object): + def __init__(self, socket_, file_): + self.file = file_ + self.socket = socket_ + + def write(self, data): + self.file.write(data) + self.socket.sendall(data) + + def close(self): + self.file.close() + self.socket.shutdown(socket.SHUT_RDWR) + self.socket.close() + + +class _NullFile(object): + def write(self, data): + pass + + def close(self): + pass + + +def new_state_sender(filename=None): + from lzma import LZMAFile + socket_ = None + if filename is not None: + fsfile_ = LZMAFile(filename, 'wb') + else: + fsfile_ = _NullFile() + file_ = None + try: + socket_ = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + socket_.connect(os.environ['STBT_TRACING_SOCKET']) + file_ = _SocketAndFileWriter(socket_, fsfile_) + except (KeyError, socket.error): + file_ = fsfile_ + return StateSender(file_) + + +def _set_heir(data, key, value): + assert len(key) > 0 + if len(key) == 1: + data[key[0]] = value + else: + _set_heir(data[key[0]], key[1:], value) + + +class StateReceiver(object): + def __init__(self, state=None): + if state is None: + state = {} + self.state = state + self.olddata = "" + + def write(self, data): + olddata, self.olddata = self.olddata, "" + buf = StringIO(olddata + data) + + for line in buf: + if not line.endswith('\n'): + # EOF (for now) indicating incomplete line + self.olddata = line + return + + try: + (msg_type, value), = json.loads(line).items() + if msg_type != 'state_change': + return + + for k, v in sorted(value['changes'].items(), + key=lambda x: len(x[0])): + _set_heir(self.state, k.split('.'), v) + except StandardError as e: + sys.stderr.write( + "Error processing state change: %s" % str(e)) + + +def test_statereceiver(): + data = {} + sr = StateReceiver(data) + sr.write( + '{"state_change": {"changes": {"test": 5, ' + '"test2": {"cat": "miaw"}}}}\r\n') + assert data == {"test": 5, "test2": {"cat": "miaw"}} + sr.write( + '{"state_change": {"changes": {"test": 8, ' + '"test3": {"dog": "woof"}}}}\r\n') + assert data == {"test": 8, "test2": {"cat": "miaw"}, + "test3": {"dog": "woof"}} + + # Incomplete write: no change: + sr.write('{"state_change": {"changes": {"te') + assert data == {"test": 8, "test2": {"cat": "miaw"}, + "test3": {"dog": "woof"}} + + # and finish that write: + sr.write('st": 12, "test3": {"dog": "baa"}}}}\r\n') + assert data == {"test": 12, "test2": {"cat": "miaw"}, + "test3": {"dog": "baa"}} + + +def test_that_statesender_is_symmetrical_with_statereceiver(): + out = {} + sr = StateReceiver(out) + ss = StateSender(sr) + + ss.set({"names": ["Arnold", "Cat", "Dave", "Kryten"]}) + assert out['names'] == ["Arnold", "Cat", "Dave", "Kryten"] diff -Nru stb-tester-22/_stbt/stbt-power.sh stb-tester-23-1-gf70a21c/_stbt/stbt-power.sh --- stb-tester-22/_stbt/stbt-power.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/stbt-power.sh 2015-07-08 17:05:05.000000000 +0000 @@ -17,8 +17,7 @@ #/ --power-outlet #/ Address of the power device and the outlet on the device. #/ The format of is: (ipp|pdu):: -#/ aten|ipp|pdu Model of the controllable power supply: -#/ * aten: ATEN (all models) +#/ ipp|pdu Model of the controllable power supply: #/ * ipp: IP Power 9258 #/ * pdu: PDUeX KWX #/ The device's network address. @@ -62,7 +61,7 @@ } uri() { - local regex='^(?pdu|ipp|aten|testfallback):(?[^: ]+)(:(?[^: ]+))?$' + local regex='^(?pdu|ipp|testfallback):(?[^: ]+)(:(?[^: ]+))?$' echo "$2" | perl -ne \ "if (/$regex/) { print $+{$1} ? $+{$1} : ''; } else { exit 1; }" @@ -79,36 +78,6 @@ esac } -aten() { - local command=$1 hostname=$2 outlet=$3 status - aten_command $command $hostname $outlet - if [[ $command =~ ^(on|off)$ ]]; then - # ATEN PE6108G outlets take between 4-8 seconds to power on - for _ in {1..12}; do - sleep 1 - status=$(aten_command status $hostname $outlet) - [[ "${status,,}" == $command ]] && return - done - die "timed out waiting for outlet to power $command" - fi -} -aten_command() { - local command=$1 hostname=$2 outlet=$3 snmp_command - local outlet_oid="enterprises.21317.1.3.2.2.2.2.$((1 + outlet)).0" - local snmp_param="-Oq -v 2c -c administrator $hostname $outlet_oid" - - case $command in - on) snmp_command="snmpset $snmp_param int 2" ;; - off) snmp_command="snmpset $snmp_param int 1" ;; - status) snmp_command="snmpget $snmp_param" ;; - esac - output="$($snmp_command)" || die "failed to connect to '$hostname'" - echo "$output" | grep -q 'No Such Object available' && - die "invalid outlet '$outlet' or unsupported device" - [[ $command == status ]] && - echo "$output" | cut -d' ' -f2 | sed -e 's/1/OFF/' -e 's/2/ON/' -} - ipp() { local command=$1 hostname=$2 outlet="$3" output diff -Nru stb-tester-22/_stbt/tv_driver.py stb-tester-23-1-gf70a21c/_stbt/tv_driver.py --- stb-tester-22/_stbt/tv_driver.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/_stbt/tv_driver.py 2015-07-08 17:05:05.000000000 +0000 @@ -163,14 +163,17 @@ def create_from_args(args, video_generator): desc = args.tv_driver - video_server = _HTTPVideoServer( - video_generator, - video_format=get_config('camera', 'video_format')) + + def make_video_server(): + return _HTTPVideoServer( + video_generator, + video_format=get_config('camera', 'video_format')) + if desc == 'assume': return _AssumeTvDriver() elif desc.startswith('fake:'): - return _FakeTvDriver(desc[5:], video_server) + return _FakeTvDriver(desc[5:], make_video_server()) elif desc == 'manual': - return _ManualTvDriver(video_server) + return _ManualTvDriver(make_video_server()) else: raise RuntimeError("Unknown video driver requested: %s" % desc) diff -Nru stb-tester-22/stbt-batch stb-tester-23-1-gf70a21c/stbt-batch --- stb-tester-22/stbt-batch 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-batch 2015-07-08 17:05:05.000000000 +0000 @@ -16,11 +16,15 @@ usage() { grep '^#/' "$0" | cut -c4-; } [ $# -ge 1 ] || { usage >&2; exit 1; } -case "$1" in +cmd="$1" +shift +case "$cmd" in -h|--help) usage; exit 0;; - run|report|instaweb) - exec "$(dirname "$0")"/stbt-batch.d/"$@";; + run) + exec "$(dirname "$0")"/stbt-batch.d/run.py "$@";; + report|instaweb) + exec "$(dirname "$0")"/stbt-batch.d/$cmd "$@";; *) usage >&2; exit 1;; esac diff -Nru stb-tester-22/stbt-batch.d/run stb-tester-23-1-gf70a21c/stbt-batch.d/run --- stb-tester-22/stbt-batch.d/run 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-batch.d/run 1970-01-01 00:00:00.000000000 +0000 @@ -1,302 +0,0 @@ -#!/usr/bin/env bash -# -*- sh-basic-offset: 2 -*- - -# Copyright 2013 YouView TV Ltd. -# License: LGPL v2.1 or (at your option) any later version (see -# https://github.com/stb-tester/stb-tester/blob/master/LICENSE for details). - -#/ Usage: -#/ stbt batch run [options] test.py [test.py ...] -#/ stbt batch run [options] test.py arg [arg ...] -- test.py arg [arg ...] [-- ...] -#/ -#/ Options: -#/ -1 Run once. The default behaviour is to run the test -#/ repeatedly as long as it passes. -#/ -k Continue running after "uninteresting" failures. -#/ -kk Continue running after any failure (except those -#/ that would prevent any further test from passing). -#/ -#/ -d Enable "stbt-debug" dump of intermediate images. -#/ -v Verbose. Print stbt standard output. -#/ -vv Extra verbose. Print stbt standard error output. -#/ -#/ -o Output directory to save the report and test-run -#/ logs under (defaults to the current directory). -#/ -t Tag to add to test run directory names (useful -#/ to differentiate directories when you intend to -#/ merge test results from multiple machines). - -usage() { grep '^#/' "$0" | cut -c4-; } -die() { echo "$(basename "$0"): error: $*" >&2; exit 1; } - -main() { - runner=$(dirname "$(realpath "$0")") - export PYTHONUNBUFFERED=x - - keep_going=0 - outputdir="$PWD" - run_once=false - stop=false - tag= - v=-v - verbose=0 - failure_count=0 - while getopts ":1dhko:t:v" option; do - case $option in - 1) run_once=true;; - d) v=-vv;; - h) usage; exit 0;; - k) keep_going=$((keep_going + 1));; - o) outputdir=$(realpath "$OPTARG");; - t) tag=-$OPTARG;; - v) verbose=$((verbose + 1));; - *) die "Invalid option '-$OPTARG'. Use '-h' for help.";; - esac - done - shift $((OPTIND - 1)) - [[ $# -gt 0 ]] || { usage >&2; exit 1; } - - which ts &>/dev/null || - die "No 'ts' command found; please install 'moreutils' package" - - exec 3>/dev/null 4>/dev/null - [ $verbose -gt 0 ] && exec 3>&1 - [ $verbose -gt 1 ] && exec 4>&1 - - run_count=0 - while true; do - while IFS=$'\t' read -a test; do - run_count=$((run_count+1)) - run "${test[@]}" /dev/null && - rundir=$(date +%Y-%m-%d_%H.%M.%S)"$tag" && - mkdir "$rundir" && - rm -f current"$tag" && ln -s "$rundir" current"$tag" && - cd "$rundir" && - tmpdir="$(mktemp -dt stbt-batch.XXX)" && - mkfifo "$tmpdir"/rawout "$tmpdir"/rawerr || - die "Failed to set up test-run directory '$outputdir/$rundir'." - - [ -n "$tag" ] && echo "Tag ${tag#-}" > extra-columns - - ( cd "$(dirname "$testpath")" && - git describe --always --dirty 2>/dev/null - ) > git-commit || rm -f git-commit - - ( cd "$(dirname "$testpath")" && - gitdir=$(dirname "$(realpath "$(git rev-parse --git-dir 2>/dev/null)")") && - echo "${testpath#$gitdir/}" || echo "$testpath" - ) > test-name - - printf "%s\n" "$@" > test-args - - "$runner"/report --html-only . >/dev/null - - user_command pre_run start - - [ $verbose -gt 0 ] && printf "\n$test $*...\n" || printf "$test $*... " - "$runner"/../stbt-run $v --save-video "video.webm" "$testpath" -- "$@" \ - >"$tmpdir"/rawout 2>"$tmpdir"/rawerr & - stbtpid=$! - local start_time=$(date +%s) - ts '[%Y-%m-%d %H:%M:%.S %z] ' < "$tmpdir"/rawout | tee stdout.log >&3 & - ts '[%Y-%m-%d %H:%M:%.S %z] ' < "$tmpdir"/rawerr | tee stderr.log >&4 & - - while true; do - interrupted=false - wait $stbtpid - exit_status=$? - $interrupted || break - done - - [[ $exit_status -eq 0 ]] && echo OK || echo FAILED - - # Data that must be collected ASAP - echo $(( $(date +%s) - $start_time )) > duration - which sensors &>/dev/null && sensors &> sensors.log - [[ -f screenshot.png ]] || "$runner"/../stbt-screenshot &>/dev/null - echo $exit_status > exit-status - - cat <<-EOF | python && - import cv2 - im = cv2.imread('screenshot.png') - if im is not None: - cv2.imwrite( - 'thumbnail.jpg', - cv2.resize(im, (640, 640 * im.shape[0] // im.shape[1])), - [cv2.cv.CV_IMWRITE_JPEG_QUALITY, 50]) - EOF - [[ $exit_status -eq 0 ]] && rm -f screenshot.png - - user_command post_run stop - - rm "$tmpdir"/rawout "$tmpdir"/rawerr - rmdir "$tmpdir" - echo "$STBT_VERSION" > stbt-version.log - grep -q "FAIL: .*: MatchTimeout" stdout.log && template - [ -f core* ] && backtrace core* - "$runner"/report --classify-only . >/dev/null - grep -q "FAIL: .*: NoVideo" stdout.log && { - check_capture_hardware || stop=true; } - - if [[ $exit_status -ne 0 ]]; then - user_command recover || stop=true - fi - - "$runner"/report --html-only . >/dev/null - - cd .. - rm -f latest"$tag"; ln -s "$rundir" latest"$tag" - popd >/dev/null - return $exit_status -} - -trap on_kill1 sigint sigterm -on_kill1() { - printf "\nReceived interrupt; waiting for current test to complete.\n" >&2 - interrupted=true stop=true - trap on_kill2 sigint sigterm -} -on_kill2() { - echo "Received interrupt; exiting." >&2 - interrupted=true stop=true - killtree $stbtpid -} - -should_i_continue() { - $stop && return 1; - [[ $exit_status -eq 0 ]] || - # "Uninteresting" failures due to the test infrastructure - [[ $keep_going -gt 0 && $exit_status -gt 1 ]] || - # Failures due to the system under test - [[ $keep_going -gt 1 ]] -} - -template() { - local template=$( - sed -n 's,^.*stbt-run: Searching for \(.*\.png\)$,\1,p' stderr.log | - tail -1) - [ -f "$template" ] && cp "$template" template.png -} - -backtrace() { - local gdbcommand corefile=$1 - gdbcommand=$(mktemp -t report.XXX) || die "Failed to create temp file" - echo "thread apply all bt" > $gdbcommand - gdb $(which python) $corefile -batch -x $gdbcommand &> backtrace.log - rm -f $gdbcommand -} - -user_command() { - local c=$("$runner"/../stbt-config batch.$1 2>/dev/null) - [[ -z "$c" ]] && return - "$c" $2 &1 - ) | ts '[%Y-%m-%d %H:%M:%.S %z] ' > decklinksrc.log - - if grep -q "enable video input failed" decklinksrc.log; then - local subdevice=$( - "$runner"/../stbt-config global.source_pipeline | - grep -o device-number=. | awk -F= '{print $2}') - local users=$( - lsof -F Lnc \ - /dev/blackmagic${subdevice:-0} \ - /dev/blackmagic/dv${subdevice:-0} \ - 2>/dev/null | - # Example `lsof` output: - # p70752 - # cgst-launch-0.10 - # Lstb-tester - # n/dev/blackmagic0 - awk '/^p/ { printf "\n" } - { sub(/^./, ""); printf $0 " " }') - echo "Blackmagic card in use: $users" > failure-reason - cp failure-reason failure-reason.manual - echo "Blackmagic card in use; exiting." - return 1 - - # Even if the card has no video connected to its input you see - # "VideoInputFrameArrived: Frame received - No input signal detected" - elif ! grep -q VideoInputFrameArrived decklinksrc.log; then - echo "Blackmagic card froze" > failure-reason - cp failure-reason failure-reason.manual - echo "Blackmagic card froze; exiting." - return 1 - fi - ;; - esac -} - -# Input: -# test1.py arg1 arg2 -- test2.py arg -- test3.py -# Output (suitable as input to `IFS=$'\t' read -a`): -# test1.py\targ1\targ2 -# test2.py\targ -# test3.py -parse_test_args() { - if [[ "$*" =~ -- ]]; then - [[ "$1" == -- ]] && shift - while [[ $# -gt 0 ]]; do - printf "%s" "$1"; shift - while [[ $# -gt 0 && "$1" != -- ]]; do printf "\t%s" "$1"; shift; done - printf "\n" - [[ "$1" == -- ]] && shift - done - else # No "--": Each command-line argument is a test script. - printf "%s\n" "$@" - fi -} - -# http://stackoverflow.com/questions/392022/best-way-to-kill-all-child-processes -killtree() { - local parent=$1 child - for child in $(ps -o ppid= -o pid= | awk "\$1==$parent {print \$2}"); do - killtree $child - done - kill $parent -} - -# Portable implementation of GNU "readlink -f" to support BSD/OSX. -realpath() { - python -c 'import os, sys; print os.path.realpath(sys.argv[1])' "$1" -} - -main "$@" diff -Nru stb-tester-22/stbt-batch.d/run-one stb-tester-23-1-gf70a21c/stbt-batch.d/run-one --- stb-tester-22/stbt-batch.d/run-one 1970-01-01 00:00:00.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-batch.d/run-one 2015-07-08 17:05:05.000000000 +0000 @@ -0,0 +1,219 @@ +#!/usr/bin/env bash +# -*- sh-basic-offset: 2 -*- + +# Copyright 2013 YouView TV Ltd. +# 2013-2015 stb-tester.com Ltd. +# License: LGPL v2.1 or (at your option) any later version (see +# https://github.com/stb-tester/stb-tester/blob/master/LICENSE for details). +# +# Input command-line arguments: +# +# * testname [args...] +# +# Input environment variables: +# +# * $tag +# * $v +# * $verbose +# * $outputdir +# +# Outputs: +# +# * A test-run directory under $outputdir +# * $outputdir/latest pointing to this directory +# +# IPC: +# +# * SIGTERM signal says stop this test +# + + +die() { echo "$(basename "$0"): error: $*" >&2; exit 2; } + +main() { + runner=$(dirname "$(realpath "$0")") + local test testpath tmpdir rundir + + test="$1" && shift && + testpath=$(realpath "$test") && + mkdir -p "$outputdir" && + pushd "$outputdir" >/dev/null && + rundir=$(date +%Y-%m-%d_%H.%M.%S)"$tag" && + mkdir "$rundir" && + send_state_change active_results_directory "\"$outputdir/$rundir\"" && + rm -f current"$tag" && ln -s "$rundir" current"$tag" && + cd "$rundir" && + tmpdir="$(mktemp -dt stbt-batch.XXX)" && + mkfifo "$tmpdir"/rawout "$tmpdir"/rawerr || + die "Failed to set up test-run directory '$outputdir/$rundir'." + + [ -n "$tag" ] && echo "Tag ${tag#-}" > extra-columns + + ( cd "$(dirname "$testpath")" && + git describe --always --dirty 2>/dev/null + ) > git-commit || rm -f git-commit + + ( cd "$(dirname "$testpath")" && + gitdir=$(dirname "$(realpath "$(git rev-parse --git-dir 2>/dev/null)")") && + echo "${testpath#$gitdir/}" || echo "$testpath" + ) > test-name + + printf "%s\n" "$@" > test-args + + "$runner"/report --html-only . >/dev/null + + user_command pre_run start + + [ $verbose -gt 0 ] && printf "\n$test $*...\n" || printf "$test $*... " + "$runner"/../stbt-run $v --save-video "video.webm" "$testpath" -- "$@" \ + >"$tmpdir"/rawout 2>"$tmpdir"/rawerr & + stbtpid=$! + local start_time=$(date +%s) + + exec 3>/dev/null 4>/dev/null + [ $verbose -gt 0 ] && exec 3>&1 + [ $verbose -gt 1 ] && exec 4>&1 + + ts '[%Y-%m-%d %H:%M:%.S %z] ' < "$tmpdir"/rawout | tee stdout.log >&3 & + ts '[%Y-%m-%d %H:%M:%.S %z] ' < "$tmpdir"/rawerr | tee stderr.log >&4 & + + wait $stbtpid + exit_status=$? + + [[ $exit_status -eq 0 ]] && echo OK || echo FAILED + + # Data that must be collected ASAP + echo $(( $(date +%s) - $start_time )) > duration + which sensors &>/dev/null && sensors &> sensors.log + [[ -f screenshot.png ]] || "$runner"/../stbt-screenshot &>/dev/null + echo $exit_status > exit-status + + cat <<-EOF | python && + import cv2 + im = cv2.imread('screenshot.png') + if im is not None: + cv2.imwrite( + 'thumbnail.jpg', + cv2.resize(im, (640, 640 * im.shape[0] // im.shape[1])), + [cv2.cv.CV_IMWRITE_JPEG_QUALITY, 50]) + EOF + [[ $exit_status -eq 0 ]] && rm -f screenshot.png + + user_command post_run stop + + rm "$tmpdir"/rawout "$tmpdir"/rawerr + rmdir "$tmpdir" + echo "$STBT_VERSION" > stbt-version.log + grep -q "FAIL: .*: MatchTimeout" stdout.log && template + [ -f core* ] && backtrace core* + STBT_TRACING_SOCKET="" "$runner"/report --classify-only . >/dev/null + grep -q "FAIL: .*: NoVideo" stdout.log && { + check_capture_hardware || touch unrecoverable-error; } + + if [[ $exit_status -ne 0 ]]; then + user_command recover || touch unrecoverable-error + fi + + "$runner"/report --html-only . >/dev/null + + cd .. + send_state_change active_results_directory null + rm -f latest"$tag"; ln -s "$rundir" latest"$tag" + popd >/dev/null + return $exit_status +} + +send_state_change() { + if [ -z "$STBT_TRACING_SOCKET" ]; then + return + fi + + date='"'$(date --iso-8601=ns)'"' + echo '{"state_change": {"changes": {"'"$1"'": '"$2"'}, "time": '"$date"'}}' \ + | socat STDIN "UNIX-CONNECT:$STBT_TRACING_SOCKET" + true +} + +template() { + local template=$( + sed -n 's,^.*stbt-run: Searching for \(.*\.png\)$,\1,p' stderr.log | + tail -1) + [ -f "$template" ] && cp "$template" template.png +} + +backtrace() { + local gdbcommand corefile=$1 + gdbcommand=$(mktemp -t report.XXX) || die "Failed to create temp file" + echo "thread apply all bt" > $gdbcommand + gdb $(which python) $corefile -batch -x $gdbcommand &> backtrace.log + rm -f $gdbcommand +} + +user_command() { + local c=$("$runner"/../stbt-config batch.$1 2>/dev/null) + [[ -z "$c" ]] && return + "$c" $2 &1 + ) | ts '[%Y-%m-%d %H:%M:%.S %z] ' > decklinksrc.log + + if grep -q "enable video input failed" decklinksrc.log; then + local subdevice=$( + "$runner"/../stbt-config global.source_pipeline | + grep -o device-number=. | awk -F= '{print $2}') + local users=$( + lsof -F Lnc \ + /dev/blackmagic${subdevice:-0} \ + /dev/blackmagic/dv${subdevice:-0} \ + 2>/dev/null | + # Example `lsof` output: + # p70752 + # cgst-launch-0.10 + # Lstb-tester + # n/dev/blackmagic0 + awk '/^p/ { printf "\n" } + { sub(/^./, ""); printf $0 " " }') + echo "Blackmagic card in use: $users" > failure-reason + cp failure-reason failure-reason.manual + echo "Blackmagic card in use; exiting." + return 1 + + # Even if the card has no video connected to its input you see + # "VideoInputFrameArrived: Frame received - No input signal detected" + elif ! grep -q VideoInputFrameArrived decklinksrc.log; then + echo "Blackmagic card froze" > failure-reason + cp failure-reason failure-reason.manual + echo "Blackmagic card froze; exiting." + return 1 + fi + ;; + esac +} + +trap on_term sigterm +on_term() { + # Ignore SIGTERM. It will have been sent to the whole process group, but we + # want this process to finish running to write out the right results files. + true; +} + +# Portable implementation of GNU "readlink -f" to support BSD/OSX. +realpath() { + python -c 'import os, sys; print os.path.realpath(sys.argv[1])' "$1" +} + +main "$@" diff -Nru stb-tester-22/stbt-batch.d/run.py stb-tester-23-1-gf70a21c/stbt-batch.d/run.py --- stb-tester-22/stbt-batch.d/run.py 1970-01-01 00:00:00.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-batch.d/run.py 2015-07-08 17:05:05.000000000 +0000 @@ -0,0 +1,296 @@ +#!/usr/bin/env python + +# Copyright 2015 stb-tester.com Ltd. +# License: LGPL v2.1 or (at your option) any later version (see +# https://github.com/stb-tester/stb-tester/blob/master/LICENSE for details). + + +import argparse +import os +import signal +import subprocess +import sys +from distutils.spawn import find_executable + + +def main(argv): + runner = os.path.dirname(os.path.abspath(__file__)) + + parser = argparse.ArgumentParser(usage=( + "\n stbt batch run [options] test.py [test.py ...]" + "\n stbt batch run [options] test.py arg [arg ...] -- " + "test.py arg [arg ...] [-- ...])")) + parser.add_argument( + '-1', '--run-once', action="store_true", help=( + 'Run once. The default behaviour is to run the test repeatedly as ' + 'long as it passes.')) + parser.add_argument( + '-k', '--keep-going', action="count", help=( + 'Continue running after failures. Provide this argument once to ' + 'continue running after "uninteresting" failures, and twice to ' + 'continue running after any failure (except those that would ' + 'prevent any further test from passing).')) + parser.add_argument( + '-d', '--debug', action="store_true", help=( + 'Enable "stbt-debug" dump of intermediate images.')) + parser.add_argument( + '-v', '--verbose', action="count", default=0, help=( + 'Verbose. Provide this argument once to print stbt standard ' + 'output. Provide this argument twice to also print stbt stderr ' + 'output.')) + parser.add_argument( + '-o', '--output', default=os.curdir, help=( + 'Output directory to save the report and test-run logs under ' + '(defaults to the current directory).')) + parser.add_argument( + '-t', '--tag', help=( + 'Tag to add to test run directory names (useful to differentiate ' + 'directories when you intend to merge test results from multiple ' + 'machines).')) + parser.add_argument( + '--shuffle', action="store_true", help=( + "Run the test cases in a random order attempting to spend the same " + "total amount of time executing each test case.")) + parser.add_argument('test_name', nargs=argparse.REMAINDER) + args = parser.parse_args(argv[1:]) + + if args.tag is not None: + tag = '-' + args.tag + else: + tag = "" + + os.environ['PYTHONUNBUFFERED'] = 'x' + + term_count = [0] + + def on_term(_signo, _frame): + term_count[0] += 1 + if term_count[0] == 1: + sys.stderr.write( + "\nReceived interrupt; waiting for current test to complete.\n") + else: + sys.stderr.write("Received interrupt; exiting.\n") + sys.exit(1) + + signal.signal(signal.SIGINT, on_term) + signal.signal(signal.SIGTERM, on_term) + + failure_count = 0 + last_exit_status = 0 + + if not find_executable('ts'): + sys.stderr.write( + "No 'ts' command found; please install 'moreutils' package\n") + return 1 + + test_cases = parse_test_args(args.test_name) + + DEVNULL_R = open('/dev/null', 'r') + run_count = 0 + + if args.shuffle: + test_generator = shuffle(test_cases, repeat=not args.run_once) + else: + test_generator = loop_tests(test_cases, repeat=not args.run_once) + + for test in test_generator: + if term_count[0] > 0: + break + run_count += 1 + subenv = dict(os.environ) + subenv['tag'] = tag + subenv['v'] = '-vv' if args.debug else '-v' + subenv['verbose'] = str(args.verbose) + subenv['outputdir'] = args.output + child = None + try: + child = subprocess.Popen( + ("%s/run-one" % runner,) + test, stdin=DEVNULL_R, env=subenv, + preexec_fn=lambda: os.setpgid(0, 0)) + last_exit_status = child.wait() + except SystemExit: + if child: + os.kill(-child.pid, signal.SIGTERM) + child.wait() + raise + + if last_exit_status != 0: + failure_count += 1 + if os.path.exists( + "%s/latest%s/unrecoverable-error" % (args.output, tag)): + break + + if last_exit_status == 0: + continue + elif last_exit_status >= 2 and args.keep_going > 0: + # "Uninteresting" failures due to the test infrastructure + continue + elif args.keep_going >= 2: + continue + else: + break + + if run_count == 1: + # If we only run a single test a single time propagate the result + # through + return last_exit_status + elif failure_count == 0: + return 0 + else: + return 1 + + +def listsplit(l, v): + """ + A bit like str.split, but for lists + + >>> listsplit(['test 1', '--', 'test 2', 'arg1', '--', 'test3'], '--') + [['test 1'], ['test 2', 'arg1'], ['test3']] + """ + out = [] + sublist = [] + for x in l: + if x == v: + if sublist: + out.append(sublist) + sublist = [] + else: + sublist.append(x) + if sublist: + out.append(sublist) + return out + + +def parse_test_args(args): + """ + >>> parse_test_args(['test 1.py', 'test2.py', 'test3.py']) + [('test 1.py',), ('test2.py',), ('test3.py',)] + >>> parse_test_args(['test1.py', 'test2.py']) + [('test1.py',), ('test2.py',)] + >>> parse_test_args(['test1.py', '--']) + [('test1.py',)] + >>> parse_test_args(['test1.py', '--', 'test2.py']) + [('test1.py',), ('test2.py',)] + >>> parse_test_args(['test1.py', '--', 'test2.py', '--']) + [('test1.py',), ('test2.py',)] + >>> parse_test_args(['test1.py', 'test2.py']) + [('test1.py',), ('test2.py',)] + >>> parse_test_args( + ... ['test1.py', 'arg1', 'arg2', '--', 'test2.py', 'arg', '--', + ... 'test3.py']) + [('test1.py', 'arg1', 'arg2'), ('test2.py', 'arg'), ('test3.py',)] + """ + if '--' in args: + return [tuple(x) for x in listsplit(args, '--')] + else: + return [(x,) for x in args] + + +def loop_tests(test_cases, repeat=True): + while True: + for test in test_cases: + yield test + if not repeat: + return + + +def weighted_choice(choices): + """ + See http://stackoverflow.com/questions/3679694/ + """ + import random + total = sum(w for c, w in choices) + r = random.uniform(0, total) + upto = 0 + for c, w in choices: + if upto + w > r: + return c + upto += w + assert False, "Shouldn't get here" + + +def shuffle(test_cases, repeat=True): + import random + import time + test_cases = test_cases[:] + random.shuffle(test_cases) + timings = {test: [0.0, 0] for test in test_cases} + + # Run all the tests first time round: + for test in test_cases: + start_time = time.time() + yield test + timings[test][0] += time.time() - start_time + timings[test][1] += 1 + + if not repeat: + return + + while True: + test = weighted_choice([(k, v[1] / v[0]) for k, v in timings.items()]) + start_time = time.time() + yield test + timings[test][0] += time.time() - start_time + timings[test][1] += 1 + + +def test_that_shuffle_runs_through_all_tests_initially_with_repeat(): + from itertools import islice + + test_cases = range(20) + out = list(islice(shuffle(test_cases), 20)) + + # They must be randomised: + assert test_cases != out + + # But all of them must have been run + assert test_cases == sorted(out) + + +def test_that_shuffle_runs_through_all_tests_no_repeat(): + test_cases = range(20) + out = list(shuffle(test_cases, repeat=False)) + + # They must be randomised: + assert test_cases != out + + # But all of them must have been run + assert test_cases == sorted(out) + + +def test_that_shuffle_equalises_time_across_tests(): + from mock import patch + faketime = [0.0] + + def mytime(): + return faketime[0] + + test_cases = [ + ("test1", 20), + ("test2", 10), + ("test3", 5), + ] + + time_spent_in_test = { + "test1": 0, + "test2": 0, + "test3": 0, + } + + def fake_run_test(testcase): + time_spent_in_test[testcase[0]] += testcase[1] + faketime[0] += testcase[1] + + with patch('time.time', mytime): + generator = shuffle(test_cases) + while faketime[0] < 100000: + fake_run_test(generator.next()) + + print time_spent_in_test + + assert 30000 < time_spent_in_test["test1"] < 36000 + assert 30000 < time_spent_in_test["test2"] < 36000 + assert 30000 < time_spent_in_test["test3"] < 36000 + +if __name__ == '__main__': + sys.exit(main(sys.argv)) diff -Nru stb-tester-22/stbt-batch.d/templates/index.html stb-tester-23-1-gf70a21c/stbt-batch.d/templates/index.html --- stb-tester-22/stbt-batch.d/templates/index.html 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-batch.d/templates/index.html 2015-07-08 17:05:05.000000000 +0000 @@ -277,10 +277,6 @@ var num_failed = $(".error:visible").length; // red var num_warnings = $(".warning:visible").length; // yellow var num_total = num_success + num_failed + num_warnings; - var percent_success = 0; - if (num_total > 0) { - percent_success = parseInt(num_success / num_total * 100); - } $("#totals").html( "Passed: " + format_summary(num_success, num_total) + ". " + diff -Nru stb-tester-22/stbt-camera.d/stbt-camera-calibrate.py stb-tester-23-1-gf70a21c/stbt-camera.d/stbt-camera-calibrate.py --- stb-tester-22/stbt-camera.d/stbt-camera-calibrate.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-camera.d/stbt-camera-calibrate.py 2015-07-08 17:05:05.000000000 +0000 @@ -15,6 +15,7 @@ import numpy from gi.repository import Gst # pylint: disable=E0611 +import _stbt.core import stbt from _stbt import tv_driver from _stbt.config import set_config, xdg_config_dir @@ -159,14 +160,16 @@ endtime = time.time() + timeout while not success and time.time() < endtime: sample = appsink.emit("pull-sample") - with stbt._numpy_from_sample(sample, readonly=True) as input_image: + with _stbt.core._numpy_from_sample(sample, readonly=True) \ + as input_image: success, corners = cv2.findChessboardCorners( input_image, (29, 15), flags=cv2.cv.CV_CALIB_CB_ADAPTIVE_THRESH) if success: # Refine the corner measurements (not sure why this isn't built into # findChessboardCorners? - with stbt._numpy_from_sample(sample, readonly=True) as input_image: + with _stbt.core._numpy_from_sample(sample, readonly=True) \ + as input_image: grey_image = cv2.cvtColor(input_image, cv2.COLOR_BGR2GRAY) cv2.cornerSubPix(grey_image, corners, (5, 5), (-1, -1), @@ -197,14 +200,14 @@ sys.stdout.write("Performing Geometric Calibration\n") undistorted_appsink = \ - stbt._display.source_pipeline.get_by_name('undistorted_appsink') + stbt._dut._display.source_pipeline.get_by_name('undistorted_appsink') ideal, corners = _find_chessboard(undistorted_appsink) undistort = calculate_distortion(ideal, corners, (1920, 1080)) unperspect = calculate_perspective_transformation( ideal, undistort.do(corners)) - geometriccorrection = stbt._display.source_pipeline.get_by_name( + geometriccorrection = stbt._dut._display.source_pipeline.get_by_name( 'geometric_correction') geometriccorrection_params = undistort.describe() + unperspect.describe() for key, value in geometriccorrection_params: @@ -482,7 +485,7 @@ await_blank(0) _create_reference_png(props['black-reference-image']) - contraststretch = stbt._display.source_pipeline.get_by_name( + contraststretch = stbt._dut._display.source_pipeline.get_by_name( 'illumination_correction') for k, v in reversed(props.items()): contraststretch.set_property(k, v) @@ -571,7 +574,7 @@ def parse_args(argv): - parser = stbt.argparser() + parser = _stbt.core.argparser() tv_driver.add_argparse_argument(parser) parser.add_argument( '--noninteractive', action="store_false", dest="interactive", @@ -615,8 +618,8 @@ sink_pipeline = ('textoverlay text="After correction" ! ' + args.sink_pipeline) - stbt.init_run(args.source_pipeline, sink_pipeline, 'none', False, - False, transformation_pipeline) + stbt.init_run(args.source_pipeline, sink_pipeline, 'none', False, False, + transformation_pipeline) tv = tv_driver.create_from_args(args, videos) diff -Nru stb-tester-22/stbt-camera.d/stbt-camera-validate.py stb-tester-23-1-gf70a21c/stbt-camera.d/stbt-camera-validate.py --- stb-tester-22/stbt-camera.d/stbt-camera-validate.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-camera.d/stbt-camera-validate.py 2015-07-08 17:05:05.000000000 +0000 @@ -85,7 +85,7 @@ def svg_to_array(svg): - from stbt import _numpy_from_sample + from _stbt.core import _numpy_from_sample pipeline = Gst.parse_launch( 'appsrc name="src" caps="image/svg" ! rsvgdec ! ' 'videoconvert ! appsink caps="video/x-raw,format=BGR" name="sink"') diff -Nru stb-tester-22/stbt-control stb-tester-23-1-gf70a21c/stbt-control --- stb-tester-22/stbt-control 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-control 2015-07-08 17:05:05.000000000 +0000 @@ -13,6 +13,7 @@ import threading import time +import _stbt.control import stbt SPECIAL_CHARS = { @@ -40,7 +41,7 @@ if args.help_keymap: sys.exit(show_help_keymap()) - remote = stbt.control.uri_to_remote(args.control, None) + remote = _stbt.control.uri_to_remote(args.control, None) if args.remote_control_key: # Send a single key and exit remote.press(args.remote_control_key) diff -Nru stb-tester-22/stbt-record stb-tester-23-1-gf70a21c/stbt-record --- stb-tester-22/stbt-record 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-record 2015-07-08 17:05:05.000000000 +0000 @@ -9,11 +9,13 @@ import itertools import sys +import _stbt.control +import _stbt.core import stbt def main(argv): - parser = stbt.argparser() + parser = _stbt.core.argparser() parser.prog = 'stbt record' parser.description = 'Create an stb-tester test script' parser.add_argument( @@ -53,7 +55,7 @@ script_out.write("import stbt\n\n\n") script_out.write("def test_that_WRITE_TESTCASE_DESCRIPTION_HERE():\n") try: - for key in stbt.control.uri_to_remote_recorder(control_recorder): + for key in _stbt.control.uri_to_remote_recorder(control_recorder): write_wait_for_match(script_out, count.next(), old_key) script_out.write(" stbt.press('%s')\n" % key) stbt.press(key) diff -Nru stb-tester-22/stbt-run stb-tester-23-1-gf70a21c/stbt-run --- stb-tester-22/stbt-run 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-run 2015-07-08 17:05:05.000000000 +0000 @@ -11,15 +11,20 @@ import sys import traceback +import _stbt.core import stbt +from _stbt.state_watch import new_state_sender -parser = stbt.argparser() +parser = _stbt.core.argparser() parser.prog = 'stbt run' parser.description = 'Run an stb-tester test script' parser.add_argument( '--save-video', help='Record video to the specified file', metavar='FILE', default=stbt.get_config('run', 'save_video')) parser.add_argument( + '--save-trace', metavar='FILE', default=None, + help='Write state to this file, as xz compressed newline-seperated JSON') +parser.add_argument( 'script', metavar='FILE[::TESTCASE]', help=( "The python test script to run. Optionally specify a python function " "name to run that function; otherwise only the script's top-level will " @@ -56,6 +61,7 @@ sys.path = [os.path.abspath(module_dir)] + sys.path return __import__(module_name) +_tracer = None try: # pylint: disable=W0611 @@ -63,24 +69,42 @@ args.source_pipeline, args.sink_pipeline, args.control, args.save_video, args.restart_source, stbt.get_config('global', 'transformation_pipeline')) + _tracer = new_state_sender(args.save_trace) # pylint: disable=W0212 + + _absfilename = None + + def tracefunc(frame_, event, _): + if event == "line" and frame_.f_code.co_filename == _absfilename: + _tracer.log_current_line(frame_.f_code.co_filename, frame_.f_lineno) + return tracefunc if '::' in args.script: - filename, function = args.script.split('::', 1) - module = import_by_filename(filename) - function = getattr(module, function) + _filename, funcname = args.script.split('::', 1) + _absfilename = os.path.abspath(_filename) + module = import_by_filename(_filename) + function = getattr(module, funcname) + _tracer.log_test_starting(args.script, _filename, funcname, + function.func_code.co_firstlineno) + sys.settrace(tracefunc) function() else: + _filename = os.path.abspath(args.script) + _absfilename = os.path.abspath(_filename) + _tracer.log_test_starting(args.script, args.script, "", 1) + sys.settrace(tracefunc) + + # pylint: disable=W0612 from stbt import ( # For backwards compatibility. We want to encourage people to expli- # citly import stbt in their scripts, so don't add new APIs here. press, press_until_match, wait_for_match, wait_for_motion, - detect_match, MatchResult, Position, detect_motion, MotionResult, - save_frame, get_frame, MatchParameters, + detect_match, MatchResult, Position, detect_motion, + MotionResult, save_frame, get_frame, MatchParameters, debug, UITestError, UITestFailure, MatchTimeout, MotionTimeout, ConfigurationError) __file__ = args.script - sys.path.insert(0, os.path.dirname(os.path.abspath(args.script))) - execfile(args.script) + sys.path.insert(0, os.path.dirname(_filename)) + execfile(_filename) except Exception as e: # pylint: disable=W0703 error_message = str(e) if not error_message and isinstance(e, AssertionError): @@ -96,4 +120,8 @@ else: sys.exit(2) # Error finally: + sys.settrace(None) + if _tracer: + _tracer.log_test_ended() + _tracer.close() stbt.teardown_run() diff -Nru stb-tester-22/stbt-templatematch stb-tester-23-1-gf70a21c/stbt-templatematch --- stb-tester-22/stbt-templatematch 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/stbt-templatematch 2015-07-08 17:05:05.000000000 +0000 @@ -13,6 +13,7 @@ import cv2 +import _stbt.logging import stbt @@ -68,7 +69,7 @@ def noop_contextmanager(): yield -with (stbt.logging.scoped_debug_level(2) if args.verbose +with (_stbt.logging.scoped_debug_level(2) if args.verbose else noop_contextmanager()): try: result = stbt.match( diff -Nru stb-tester-22/tests/test-camera.sh stb-tester-23-1-gf70a21c/tests/test-camera.sh --- stb-tester-22/tests/test-camera.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test-camera.sh 2015-07-08 17:05:05.000000000 +0000 @@ -82,13 +82,12 @@ } test_that_stbt_camera_calibrate_corrects_for_geometric_distortion() { - skip_if_no_stbt_plugins + skip_if_no_stbt_camera + skip_if_no_rsvg_plugins set_config camera.tv_driver assume set_config global.control none - skip_if_no_rsvg_plugins - start_fake_video_src_launch_1080 \ uridecodebin "uri=file://$testdir/capture-chessboard.png" \ ! videoconvert ! imagefreeze && diff -Nru stb-tester-22/tests/test-match.sh stb-tester-23-1-gf70a21c/tests/test-match.sh --- stb-tester-22/tests/test-match.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test-match.sh 2015-07-08 17:05:05.000000000 +0000 @@ -223,7 +223,7 @@ test_press_until_match_reads_interval_secs_from_config_file() { cat > test-3s.py <<-EOF && import stbt - start = stbt._display.gst_samples().next().get_buffer().pts + start = stbt._dut._display.gst_samples().next().get_buffer().pts match = press_until_match( "checkers-8", "$testdir/videotestsrc-checkers-8.png") assert (match.timestamp - start) >= 3e9, ( @@ -233,7 +233,7 @@ cat > test-1s.py <<-EOF && import stbt - start = stbt._display.gst_samples().next().get_buffer().pts + start = stbt._dut._display.gst_samples().next().get_buffer().pts match = press_until_match( "checkers-8", "$testdir/videotestsrc-checkers-8.png") assert (match.timestamp - start) < 3e9, ( diff -Nru stb-tester-22/tests/test_ocr.py stb-tester-23-1-gf70a21c/tests/test_ocr.py --- stb-tester-22/tests/test_ocr.py 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test_ocr.py 2015-07-08 17:05:05.000000000 +0000 @@ -8,6 +8,7 @@ from nose.plugins.skip import SkipTest from nose.tools import eq_, raises +import _stbt.core import stbt @@ -66,7 +67,7 @@ mode=stbt.OcrMode.SINGLE_WORD) # pylint: disable=W0212 - if stbt._tesseract_version() < distutils.version.LooseVersion('3.03'): + if _stbt.core._tesseract_version() < distutils.version.LooseVersion('3.03'): raise SkipTest('tesseract is too old') # Now the real test: @@ -79,7 +80,8 @@ @raises(RuntimeError) def test_that_with_old_tesseract_ocr_raises_an_exception_with_patterns(): # pylint: disable=W0212 - if stbt._tesseract_version() >= distutils.version.LooseVersion('3.03'): + if (_stbt.core._tesseract_version() + >= distutils.version.LooseVersion('3.03')): raise SkipTest('tesseract is too new') stbt.ocr( diff -Nru stb-tester-22/tests/test-ocr.sh stb-tester-23-1-gf70a21c/tests/test-ocr.sh --- stb-tester-22/tests/test-ocr.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test-ocr.sh 2015-07-08 17:05:05.000000000 +0000 @@ -14,6 +14,7 @@ stbt run -v \ --source-pipeline="videotestsrc pattern=black ! \ video/x-raw,format=BGR ! \ - textoverlay text=Hello\ there font-desc=Sans\ 48" \ + textoverlay text=Hello\ there font-desc=Sans\ 48 ! \ + video/x-raw,format=BGR" \ test.py } diff -Nru stb-tester-22/tests/test_power.py stb-tester-23-1-gf70a21c/tests/test_power.py --- stb-tester-22/tests/test_power.py 1970-01-01 00:00:00.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test_power.py 2015-07-08 17:05:05.000000000 +0000 @@ -0,0 +1,79 @@ +"""Tests for the _ATEN_PE6108G PDU class""" +from contextlib import contextmanager + +from mock import patch +from nose.tools import raises +from pysnmp.proto.rfc1902 import Integer + +from _stbt.power import uri_to_power_outlet + + +def mock_data(int_value): + """Match the format of the data returned from pysnmp""" + return (None, None, None, [(oid, Integer(int_value))]) + + +@contextmanager +def mock_command_gen(): + """Perform mocks and return a mocked CommandGenerator instance.""" + with patch('time.sleep'),\ + patch('pysnmp.entity.rfc3413.oneliner.cmdgen.UdpTransportTarget'),\ + patch('pysnmp.entity.rfc3413.oneliner.cmdgen.CommandGenerator')\ + as mocked_command_gen: + yield mocked_command_gen.return_value + + +outlet = 1 +oid = "1.3.6.1.4.1.21317.1.3.2.2.2.2.{0}.0".format(outlet + 1) + + +def test_aten_get_on(): + with mock_command_gen() as mock_command: + mock_command.getCmd.return_value = mock_data(2) + aten = uri_to_power_outlet('aten:mock.host.name:1') + + result = aten.get() + + assert result is True + + +def test_aten_get_off(): + with mock_command_gen() as mock_command: + mock_command.getCmd.return_value = mock_data(1) + aten = uri_to_power_outlet('aten:mock.host.name:1') + + result = aten.get() + + assert result is False + + +def test_aten_set_on(): + with mock_command_gen() as mock_command: + mock_command.setCmd.return_value = mock_data(2) + mock_command.getCmd.side_effect = [mock_data(n) for n in (1, 1, 1, 2)] + aten = uri_to_power_outlet('aten:mock.host.name:1') + + aten.set(True) + + assert mock_command.getCmd.call_count == 4 + + +def test_aten_set_off(): + with mock_command_gen() as mock_command: + mock_command.setCmd.return_value = mock_data(1) + mock_command.getCmd.side_effect = [mock_data(n) for n in (2, 2, 1)] + aten = uri_to_power_outlet('aten:mock.host.name:1') + + aten.set(False) + + assert mock_command.getCmd.call_count == 3 + + +@raises(RuntimeError) +def test_aten_set_timeout(): + with mock_command_gen() as mock_command: + mock_command.setCmd.return_value = mock_data(1) + mock_command.getCmd.return_value = mock_data(2) + aten = uri_to_power_outlet('aten:mock.host.name:1') + + aten.set(False) diff -Nru stb-tester-22/tests/test-stbt-batch.sh stb-tester-23-1-gf70a21c/tests/test-stbt-batch.sh --- stb-tester-22/tests/test-stbt-batch.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test-stbt-batch.sh 2015-07-08 17:05:05.000000000 +0000 @@ -96,46 +96,6 @@ grep -q tests/test.py index.html || fail "test name not in index.html" } -test_stbt_batch_run_parse_test_args() { - sed -n '/^parse_test_args() {/,/^}/ p' "$srcdir"/stbt-batch.d/run \ - > parse_test_args.sh && - . parse_test_args.sh && - declare -f parse_test_args || fail "'parse_test_args' not defined" - - parse_test_args "test 1.py" test2.py test3.py | - tee /dev/stderr | # print to log - while IFS=$'\t' read -a test; do echo "$test"; done > output1.log - cat > expected1.log <<-EOF - test 1.py - test2.py - test3.py - EOF - diff -u expected1.log output1.log || - fail "Unexpected output from 'parse_test_args' without '--'" - - parse_test_args test1.py "arg 1" arg2 -- test2.py arg -- test3.py | - tee /dev/stderr | # print to log - while IFS=$'\t' read -a test; do - for x in "${test[@]}"; do printf "'$x' "; done; printf "\n" - done > output2.log - cat > expected2.log <<-EOF - 'test1.py' 'arg 1' 'arg2' - 'test2.py' 'arg' - 'test3.py' - EOF - diff -u expected2.log output2.log || - fail "Unexpected output from 'parse_test_args' with '--'" -} - -test_stbt_batch_run_killtree() { - sed -n '/^killtree()/,/^}/ p' "$srcdir"/stbt-batch.d/run > killtree.sh && - . killtree.sh && - declare -f killtree || fail "'killtree' not defined" - - . "$testdir"/test-run-tests.sh - test_killtree -} - test_signalname() { sed -n '/^signalname()/,/^}/ p' "$srcdir"/stbt-batch.d/report \ > signalname.sh && @@ -433,6 +393,26 @@ [ "$?" = 2 ] || fail "Test should error" } +test_that_stbt_batch_reports_results_directory() { + create_test_repo + export STBT_TRACING_SOCKET=$PWD/stbt_tracing_socket + socat -d -d -d -D -t10 UNIX-LISTEN:$STBT_TRACING_SOCKET,fork GOPEN:trace.log & + SOCAT_PID=$! + + while ! [ -e "$PWD/stbt_tracing_socket" ]; do + sleep 0.1 + done + + stbt batch run -1vv tests/test.py tests/test2.py \ + || fail "Tests should succeed" + + [ "$(grep active_results_directory trace.log | wc -l)" = 4 ] \ + || fail "active_results_directory not written" + kill $SOCAT_PID + + cat trace.log +} + test_stbt_batch_output_dir() { create_test_repo mkdir "my results" @@ -488,3 +468,12 @@ stbt batch run -1 tests/test_functions.py::test_that_asserts_the_impossible assert grep -q "AssertionError: assert 1 + 1 == 3" latest/failure-reason } + +test_that_stbt_batch_run_shuffle_runs_tests() { + create_test_repo + stbt batch run -1 --shuffle \ + tests/test_functions.py::test_that_does_nothing \ + tests/test_functions.py::test_that_this_test_is_run + ls -d ????-??-??_??.??.??* > testruns + [[ $(cat testruns | wc -l) -eq 2 ]] || fail "Expected 2 test runs" +} diff -Nru stb-tester-22/tests/test-stbt-py.sh stb-tester-23-1-gf70a21c/tests/test-stbt-py.sh --- stb-tester-22/tests/test-stbt-py.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test-stbt-py.sh 2015-07-08 17:05:05.000000000 +0000 @@ -347,7 +347,7 @@ test_that_restart_source_option_is_read() { cat > test.py <<-EOF && import stbt - print "value: %s" % stbt._display.restart_source_enabled + print "value: %s" % stbt._dut._display.restart_source_enabled EOF # Read from the command line stbt run -v --restart-source --control none test.py && @@ -397,10 +397,14 @@ } test_clock_visualisation() { - python -c "import stbt, distutils, sys; sys.exit( \ - 0 if stbt._tesseract_version() >= distutils.version.LooseVersion('3.03') - else 1)" \ - || skip "Requires tesseract >= 3.03 for 'tesseract_user_patterns'" + PYTHONPATH="$srcdir" python -c "import stbt, _stbt.core, distutils, sys; \ + sys.exit(0 if (_stbt.core._tesseract_version() \ + >= distutils.version.LooseVersion('3.03')) else 77)" + case $? in + 0) true;; + 77) skip "Requires tesseract >= 3.03 for 'tesseract_user_patterns'";; + *) fail "Probing tesseract version failed";; + esac cat > test.py <<-EOF && import time @@ -420,7 +424,7 @@ def read_time(): s = stbt.ocr( stbt.get_frame(), mode=stbt.OcrMode.SINGLE_LINE, - tesseract_user_patterns=["\d\d:\d\d:\d\d:\d\d"], + tesseract_user_patterns=["\d\d:\d\d:\d\d.\d\d"], region=stbt.Region(x=5, y=5, right=200, bottom=35)).replace(" ", "") d = datetime.date.today() return datetime.datetime( diff -Nru stb-tester-22/tests/test-stbt-run.sh stb-tester-23-1-gf70a21c/tests/test-stbt-run.sh --- stb-tester-22/tests/test-stbt-run.sh 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/tests/test-stbt-run.sh 2015-07-08 17:05:05.000000000 +0000 @@ -151,3 +151,50 @@ stbt run tests/test.py::test_that_this_test_is_run [ -e "touched" ] || fail "Test not run" } + +state_printer() { + cat > state_printer.py <<-EOF + import json + import sys + from _stbt import state_watch + data = {} + sr = state_watch.StateReceiver(data) + for line in sys.stdin: + sr.write(line) + print json.dumps(data, sort_keys=True) + EOF + PYTHONPATH=$srcdir python state_printer.py +} + +run_state_test() { + cat > test.py <<-EOF && + stbt.wait_for_match("$testdir/videotestsrc-redblue.png") + EOF + + cat > expected_states <<-EOF + {"test_run": {"current_line": {}, "test_case": {"file": "test.py", "function": "", "line": 1, "name": "test.py"}}} + {"test_run": {"current_line": {"file": "$PWD/test.py", "line": 1}, "test_case": {"file": "test.py", "function": "", "line": 1, "name": "test.py"}}} + {"test_run": {}} + EOF + + stbt run "$@" test.py || fail "Test failed" +} + +test_that_stbt_run_tracing_is_written_to_file() { + run_state_test --save-trace=trace.jsonl.xz || fail "Test failed" + [ -e "trace.jsonl.xz" ] || fail "Trace not written" + xzcat "trace.jsonl.xz" | grep -q "state_change" || fail "state_change not written" + diff expected_states <(xzcat "trace.jsonl.xz" | state_printer) +} + +test_that_stbt_run_tracing_is_written_to_socket() { + export STBT_TRACING_SOCKET=$PWD/trace-socket + socat UNIX-LISTEN:$STBT_TRACING_SOCKET,fork OPEN:trace.jsonl,creat,append & + SOCAT_PID=$! + sleep 1 + + run_state_test || fail "Test failed" + kill "$SOCAT_PID" + grep -q "state_change" "trace.jsonl" || fail "state_change not written" + diff expected_states <(state_printer <"trace.jsonl") || fail "Wrong states" +} diff -Nru stb-tester-22/.travis.yml stb-tester-23-1-gf70a21c/.travis.yml --- stb-tester-22/.travis.yml 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/.travis.yml 2015-07-08 17:05:05.000000000 +0000 @@ -27,6 +27,7 @@ python-gobject python-jinja2 python-lxml + python-lzma python-matplotlib python-nose python-numpy @@ -34,6 +35,7 @@ python-scipy python-serial python-yaml + socat tesseract-ocr tesseract-ocr-deu tesseract-ocr-eng @@ -44,9 +46,11 @@ - sudo pip install astroid==1.2.1 isort==3.9.0 + mock pylint==1.3.1 + pysnmp rednose - - git clone http://git.chromium.org/webm/webminspector.git ~/webminspector + - git clone https://chromium.googlesource.com/webm/webminspector ~/webminspector - | { wget http://ftpmirror.gnu.org/parallel/parallel-20140522.tar.bz2 || diff -Nru stb-tester-22/VERSION stb-tester-23-1-gf70a21c/VERSION --- stb-tester-22/VERSION 2015-03-27 13:35:10.000000000 +0000 +++ stb-tester-23-1-gf70a21c/VERSION 2015-07-08 17:05:05.000000000 +0000 @@ -1 +1 @@ -22 +23-1-gf70a21c