diff -Nru pushpin-0.0~20140620/config/pushpin.conf pushpin-1.0.0/config/pushpin.conf --- pushpin-0.0~20140620/config/pushpin.conf 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/config/pushpin.conf 2014-09-15 17:04:41.000000000 +0000 @@ -55,11 +55,6 @@ [handler] -# redis server to use for storing state -redis_host=127.0.0.1 -redis_port=6379 -redis_prefix=pushpin- - # bind PULL for receiving publish commands push_in_spec=tcp://127.0.0.1:5560 diff -Nru pushpin-0.0~20140620/debian/changelog pushpin-1.0.0/debian/changelog --- pushpin-0.0~20140620/debian/changelog 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/changelog 2014-09-18 20:28:11.000000000 +0000 @@ -1,3 +1,23 @@ +pushpin (1.0.0-3) unstable; urgency=low + + * Fix typo in debian/rules, which prevented proper use of hardening flags. + + -- Jan Niehusmann Thu, 18 Sep 2014 20:24:58 +0200 + +pushpin (1.0.0-2) unstable; urgency=low + + * Remove dependencies on redis-server, python-redis + + -- Jan Niehusmann Tue, 16 Sep 2014 23:51:11 +0200 + +pushpin (1.0.0-1) unstable; urgency=low + + * New upstream version 1.0.0 + * Update Vcs URL + * implement hardening using buildflags instead of hardening-wrapper + + -- Jan Niehusmann Tue, 16 Sep 2014 21:51:41 +0200 + pushpin (0.0~20140620-1) unstable; urgency=low * Merge changes from upstream git. (upstream commit id 897ed1b) diff -Nru pushpin-0.0~20140620/debian/compat pushpin-1.0.0/debian/compat --- pushpin-0.0~20140620/debian/compat 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/compat 2014-09-18 20:28:11.000000000 +0000 @@ -1 +1 @@ -8 +9 diff -Nru pushpin-0.0~20140620/debian/control pushpin-1.0.0/debian/control --- pushpin-0.0~20140620/debian/control 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/control 2014-09-18 20:28:11.000000000 +0000 @@ -2,15 +2,15 @@ Section: net Priority: extra Maintainer: Jan Niehusmann -Build-Depends: debhelper (>= 8.0.0), libqt4-dev (>= 4.7), libqca2-dev (>= 2.0), libzmq3-dev (>= 2.0), qca2-utils, pkg-config, hardening-wrapper, libqjson-dev, qconf, libqca2-plugin-ossl +Build-Depends: debhelper (>= 9), libqt4-dev (>= 4.7), libqca2-dev (>= 2.0), libzmq3-dev (>= 2.0), qca2-utils, pkg-config, libqjson-dev, qconf, libqca2-plugin-ossl Standards-Version: 3.9.5 Homepage: https://github.com/fanout/pushpin -Vcs-Git: git://git.debian.org/users/jan/pushpin.git -Vcs-Browser: http://git.debian.org/?p=users/jan/pushpin.git;a=summary +Vcs-Git: git://anonscm.debian.org/users/jan/pushpin.git +Vcs-Browser: http://anonscm.debian.org/gitweb/?p=users/jan/pushpin.git;a=summary Package: pushpin Architecture: any -Depends: ${shlibs:Depends}, ${misc:Depends}, mongrel2-core (>= 1.9.0), zurl (>= 1.3.0-2), redis-server, python-zmq, python-redis, python-setproctitle, python-jinja2, python, adduser, python-tnetstring +Depends: ${shlibs:Depends}, ${misc:Depends}, mongrel2-core (>= 1.9.0), zurl (>= 1.3.0-2), python-zmq, python-setproctitle, python-jinja2, python, adduser, python-tnetstring Description: HTTP reverse proxy server for streaming and long-polling services Pushpin is an HTTP reverse proxy server that makes it easy to implement streaming and long-polling services. It communicates with backend web diff -Nru pushpin-0.0~20140620/debian/copyright pushpin-1.0.0/debian/copyright --- pushpin-0.0~20140620/debian/copyright 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/copyright 2014-09-18 20:28:11.000000000 +0000 @@ -1,17 +1,13 @@ Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ Upstream-Name: pushpin -Source: As there is no released tarball, the source was retrieved from the git repo at - https://github.com/fanout/pushpin/ - and from the git submodules at - https://github.com/fanout/common - https://github.com/jkarneges/qzmq +Source: http://packages.fanout.io/source/pushpin-1.0.0.tar.bz2 Files: * -Copyright: 2012-2013 Fanout, Inc. +Copyright: 2012-2014 Fanout, Inc. License: AGPL-3+ Files: common/* -Copyright: 2012-2013 Fanout, Inc. +Copyright: 2012-2014 Fanout, Inc. License: GPL-3+ Files: common/processquit.* diff -Nru pushpin-0.0~20140620/debian/patches/debian-changes pushpin-1.0.0/debian/patches/debian-changes --- pushpin-0.0~20140620/debian/patches/debian-changes 2014-06-26 10:50:25.000000000 +0000 +++ pushpin-1.0.0/debian/patches/debian-changes 2014-09-19 08:41:32.000000000 +0000 @@ -4,8 +4,8 @@ Option single-debian-patch is used as the changes are tracked in git. ---- pushpin-0.0~20140620.orig/Makefile -+++ pushpin-0.0~20140620/Makefile +--- pushpin-1.0.0.orig/Makefile ++++ pushpin-1.0.0/Makefile @@ -1,6 +1,7 @@ -prefix = /usr/local -varprefix = /var/local @@ -30,8 +30,8 @@ pushpin.inst: pushpin sed -e "s,^default_config_dir =.*,default_config_dir = \"$(configdir)\",g" pushpin > pushpin.inst && chmod 755 pushpin.inst ---- pushpin-0.0~20140620.orig/runner/m2adapter.conf.template -+++ pushpin-0.0~20140620/runner/m2adapter.conf.template +--- pushpin-1.0.0.orig/runner/m2adapter.conf.template ++++ pushpin-1.0.0/runner/m2adapter.conf.template @@ -12,22 +12,22 @@ m2_send_idents={% for i in instances %}{ m2_control_specs={% for i in instances %}{% if not loop.first %},{% endif %}{{ i.control_spec }}{% endfor %} @@ -61,8 +61,8 @@ # don't send more than this to mongrel2 m2_client_buffer=200000 ---- pushpin-0.0~20140620.orig/runner/services.py -+++ pushpin-0.0~20140620/runner/services.py +--- pushpin-1.0.0.orig/runner/services.py ++++ pushpin-1.0.0/runner/services.py @@ -50,10 +50,10 @@ def write_m2adapter_config(configpath, r instances = list() for port in ports: @@ -77,8 +77,8 @@ instances.append(i) vars = dict() ---- pushpin-0.0~20140620.orig/runner/zurl.conf -+++ pushpin-0.0~20140620/runner/zurl.conf +--- pushpin-1.0.0.orig/runner/zurl.conf ++++ pushpin-1.0.0/runner/zurl.conf @@ -3,13 +3,13 @@ instance_id= @@ -96,8 +96,8 @@ # bind ROUTER for handling non-streamed requests/responses in_req_spec= ---- pushpin-0.0~20140620.orig/runner/mongrel2.conf.template -+++ pushpin-0.0~20140620/runner/mongrel2.conf.template +--- pushpin-1.0.0.orig/runner/mongrel2.conf.template ++++ pushpin-1.0.0/runner/mongrel2.conf.template @@ -1,7 +1,7 @@ {% for port in ports %}handler_{{ port.value }} = Handler( - send_spec="ipc:///tmp/pushpin-m2-out-{{ port.value }}", @@ -117,8 +117,8 @@ default_host="default", name="server-{{ port.value }}", port={{ port.value }}, ---- pushpin-0.0~20140620.orig/config/internal.conf -+++ pushpin-0.0~20140620/config/internal.conf +--- pushpin-1.0.0.orig/config/internal.conf ++++ pushpin-1.0.0/config/internal.conf @@ -1,68 +1,68 @@ [proxy] # list of connect PULL for receiving mongrel2 HTTP/WS requests @@ -210,8 +210,8 @@ # list of connect PUB for sending mongrel2 HTTP/WS responses -m2a_out_specs=ipc:///tmp/pushpin-m2zhttp-in,ipc:///tmp/pushpin-m2zws-in +m2a_out_specs=ipc:///var/run/pushpin/pushpin-m2zhttp-in,ipc:///var/run/pushpin/pushpin-m2zws-in ---- pushpin-0.0~20140620.orig/config/pushpin.conf -+++ pushpin-0.0~20140620/config/pushpin.conf +--- pushpin-1.0.0.orig/config/pushpin.conf ++++ pushpin-1.0.0/config/pushpin.conf @@ -5,7 +5,7 @@ include=internal.conf [runner] @@ -221,7 +221,7 @@ # plain HTTP port that mongrel2 should listen on http_port=7999 -@@ -68,7 +68,7 @@ push_in_http_addr=127.0.0.1 +@@ -63,7 +63,7 @@ push_in_http_addr=127.0.0.1 push_in_http_port=5561 # bind PUB for sending stats (metrics, subscription info, etc) @@ -231,3 +231,47 @@ # bind REP for responding to commands -command_spec=ipc:///tmp/pushpin-command +command_spec=ipc:///var/run/pushpin/pushpin-command +--- pushpin-1.0.0.orig/m2adapter/src/src.pro ++++ pushpin-1.0.0/m2adapter/src/src.pro +@@ -15,3 +15,8 @@ unix:!isEmpty(BINDIR) { + target.path = $$BINDIR + INSTALLS += target + } ++ ++QMAKE_CXXFLAGS += $(Q_CXXFLAGS) ++QMAKE_CFLAGS_DEBUG += $(Q_CFLAGS) ++QMAKE_CFLAGS_RELEASE += $(Q_CFLAGS) ++QMAKE_LFLAGS += $(Q_LDFLAGS) +--- pushpin-1.0.0.orig/proxy/tests/tests.pri ++++ pushpin-1.0.0/proxy/tests/tests.pri +@@ -18,3 +18,8 @@ INCLUDEPATH += $$QZMQ_DIR/src + + INCLUDEPATH += $$COMMON_DIR + DEFINES += NO_IRISNET ++ ++QMAKE_CXXFLAGS += $(Q_CXXFLAGS) ++QMAKE_CFLAGS_DEBUG += $(Q_CFLAGS) ++QMAKE_CFLAGS_RELEASE += $(Q_CFLAGS) ++QMAKE_LFLAGS += $(Q_LDFLAGS) +--- pushpin-1.0.0.orig/proxy/src/pro/pushpin-proxy/pushpin-proxy.pro ++++ pushpin-1.0.0/proxy/src/pro/pushpin-proxy/pushpin-proxy.pro +@@ -18,3 +18,8 @@ unix:!isEmpty(BINDIR) { + target.path = $$BINDIR + INSTALLS += target + } ++ ++QMAKE_CXXFLAGS += $(Q_CXXFLAGS) ++QMAKE_CFLAGS_DEBUG += $(Q_CFLAGS) ++QMAKE_CFLAGS_RELEASE += $(Q_CFLAGS) ++QMAKE_LFLAGS += $(Q_LDFLAGS) +--- pushpin-1.0.0.orig/proxy/src/pro/libpushpin-proxy/libpushpin-proxy.pro ++++ pushpin-1.0.0/proxy/src/pro/libpushpin-proxy/libpushpin-proxy.pro +@@ -11,3 +11,8 @@ OBJECTS_DIR = $$OUT_PWD/_obj + + include($$OUT_PWD/../../../conf.pri) + include(libpushpin-proxy.pri) ++ ++QMAKE_CXXFLAGS += $(Q_CXXFLAGS) ++QMAKE_CFLAGS_DEBUG += $(Q_CFLAGS) ++QMAKE_CFLAGS_RELEASE += $(Q_CFLAGS) ++QMAKE_LFLAGS += $(Q_LDFLAGS) diff -Nru pushpin-0.0~20140620/debian/README.source pushpin-1.0.0/debian/README.source --- pushpin-0.0~20140620/debian/README.source 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/README.source 2014-09-18 20:28:11.000000000 +0000 @@ -1,8 +1,8 @@ pushpin for Debian ------------------ -As there is no release, yet, the source was downloaded from git. -(See changelog for upstream git commit id.) +Pushpin 1.0.0 was downloaded from http://packages.fanout.io/source/pushpin-1.0.0.tar.bz2 +on Sep 16, 2014. The file configure.exe was removed from the archive, as its source is not included. diff -Nru pushpin-0.0~20140620/debian/rules pushpin-1.0.0/debian/rules --- pushpin-0.0~20140620/debian/rules 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/rules 2014-09-18 20:28:11.000000000 +0000 @@ -4,10 +4,17 @@ # Uncomment this to turn on verbose mode. #export DH_VERBOSE=1 -export DEB_BUILD_HARDENING=1 +export DEB_BUILD_MAINT_OPTIONS = hardening=+all +export DEB_LDFLAGS_MAINT_APPEND = -Wl,--as-needed + +Q_CPPFLAGS:=$(shell dpkg-buildflags --get CPPFLAGS) +Q_CFLAGS:=$(shell dpkg-buildflags --get CFLAGS) $(Q_CPPFLAGS) +Q_CXXFLAGS:=$(shell dpkg-buildflags --get CXXFLAGS) $(Q_CPPFLAGS) +Q_LDFLAGS:=$(shell dpkg-buildflags --get LDFLAGS) +export Q_CPPFLAGS Q_CFLAGS Q_CXXFLAGS Q_LDFLAGS %: - dh $@ + dh $@ override_dh_auto_install: dh_auto_install diff -Nru pushpin-0.0~20140620/debian/source/lintian-overrides pushpin-1.0.0/debian/source/lintian-overrides --- pushpin-0.0~20140620/debian/source/lintian-overrides 2014-06-26 10:49:42.000000000 +0000 +++ pushpin-1.0.0/debian/source/lintian-overrides 1970-01-01 00:00:00.000000000 +0000 @@ -1,7 +0,0 @@ -# pushpin uses qconf to generate its makefile, and -# qconf doesn't honor the flags exported by dpkg-buildflags. -# -# Therefore, for now, ignore the lintian warning. In the -# long term, this should be fixed within qconf. -# (see e.g. https://lists.debian.org/debian-lint-maint/2013/06/msg00024.html) -pushpin source: build-depends-on-obsolete-package build-depends: hardening-wrapper => use dpkg-buildflags instead diff -Nru pushpin-0.0~20140620/doc/websocket-over-http.md pushpin-1.0.0/doc/websocket-over-http.md --- pushpin-0.0~20140620/doc/websocket-over-http.md 1970-01-01 00:00:00.000000000 +0000 +++ pushpin-1.0.0/doc/websocket-over-http.md 2014-09-15 17:04:41.000000000 +0000 @@ -0,0 +1,162 @@ +WebSocket-Over-HTTP Protocol +============================ + +The WebSocket-Over-HTTP protocol is a simple, text-based protocol for gatewaying between a WebSocket client and a conventional HTTP server. + +Why?? +----- + +GRIP (Generic Realtime Intermediary Protocol, used by Pushpin and Fanout.io) enables out-of-band message injection into WebSocket connections. Normally, using GRIP with WebSockets requires a WebSocket connection on both sides of the proxy: + + Client <--WS--> GRIP Proxy <--WS--> Server + +The GRIP Proxy is a publish/subscribe service. When the server has data to send spontaneously, it does not use its WebSocket connection to send the data. Rather, it uses an out-of-band publish command to the proxy (usually via HTTP POST). This means that the WebSocket connection between the proxy and the server is used almost exclusively for servicing incoming requests from the client. + +If the communication path between the proxy and the server only needs to handle request/response interactions, then HTTP becomes a viable alternative to a WebSocket: + + Client <--WS--> GRIP Proxy <--HTTP--> Server + +Using HTTP for communication between the proxy and server may be easier to maintain and scale since HTTP server tools are well established. Plus, if the server is merely doing stateless RPC processing, then HTTP is arguably a respectable choice for this tier in the service. + +Of course, the usefulness of this gatewaying is entirely dependent on the server having a way to send data to clients out-of-band. As such, it is recommended that the WebSocket-Over-HTTP protocol be used in combination with GRIP. Note, however, that the WebSocket-Over-HTTP protocol does not explicitly depend on GRIP. + +Protocol +-------- + +The gateway and server exchange WebSocket "events" via HTTP requests and responses. The following events are defined: + +* `OPEN` - WebSocket negotiation request or acknowledgement +* `TEXT`, `BINARY` - Messages with content +* `PING`, `PONG` - Ping and pong messages +* `CLOSE` - Close message with 16-bit close code +* `DISCONNECT` - Indicates connection closed uncleanly or does not exist + +Events are encoded in a format similar to HTTP chunked transfer encoding: + + TEXT B\r\n + hello world\r\n + +The format is the name of the event, a space, the hexidecimal encoding of the content size, a carriage return and newline, the content bytes, and finally another carriage return and newline. + +For events with no content, the size and content section can be omitted: + + OPEN\r\n + +Events with content are TEXT, BINARY, and CLOSE. Events without content are OPEN, PING, PONG, and DISCONNECT. + +An event that should not contain content MAY be encoded with content. Receivers should ignore such content. For example, this is legal: + + OPEN 0\r\n + \r\n + +One or more encoded events are then concatenated and placed in the body of an HTTP request or response, with content type `application/websocket-events`. + +Example +------- + +Gateway opens connection: + + POST /target HTTP/1.1 + Connection-Id: b5ea0e11 + Content-Type: application/websocket-events + [... any headers included by the client WebSocket handshake ...] + + OPEN\r\n + \r\n + +Server accepts connection: + + HTTP/1.1 200 OK + Content-Type: application/websocket-events + [... any headers to include in the WebSocket negotiation response ...] + + OPEN\r\n + \r\n + +Gateway relays message from client: + + POST /target HTTP/1.1 + Connection-Id: b5ea0e11 + Content-Type: application/websocket-events + + TEXT 5\r\n + hello\r\n + +Server responds with two messages: + + HTTP/1.1 200 OK + Content-Type: application/websocket-events + + TEXT 5\r\n + world\r\n + TEXT 1B\r\n + here is another nice message\r\n + +Gateway relays a close message: + + POST /target HTTP/1.1 + Connection-Id: b5ea0e11 + Content-Type: application/websocket-events + + CLOSE 2\r\n + [... binary status code ...]\r\n + +Server sends a close message back: + + HTTP/1.1 200 OK + Content-Type: application/websocket-events + + CLOSE 2\r\n + [... binary status code ...]\r\n + +State Management +---------------- + +Headers of the initial WebSocket negotiation request MUST be replayed with every request made by the gateway. This means that if the client uses cookies or other headers for authentication purposes, the server will receive this data with every message. + +The gateway includes a `Connection-Id` header which uniquely identifies a particular client connection. Servers that need to track connections can use this. In most cases, though, servers should not have to care about connections. + +It is possible to bind metadata to the connection via a `Set-Meta-*` header. This works similar to a cookie. The server can set a field that the gateway should echo back on all subsequent requests. + +For example, a client supplies a cookie which the gateway relays across during connect: + + POST /target HTTP/1.1 + Connection-Id: b5ea0e11 + Content-Type: application/websocket-events + Cookie: [... auth info ...] + + OPEN\r\n + \r\n + +The server accepts the connection and binds a User field based on the cookie: + + HTTP/1.1 200 OK + Content-Type: application/websocket-events + Set-Meta-User: alice + + OPEN\r\n + \r\n + +Now, any further requests from the gateway will include a Meta-User header: + + POST /target HTTP/1.1 + Connection-Id: b5ea0e11 + Meta-User: alice + Content-Type: application/websocket-events + + TEXT 5\r\n + hello\r\n + +Security note: gateways MUST NOT relay any headers from the client that are prefixed with `Meta-`. This prevents the client from spoofing metadata bindings. Additionally, the server needs to ensure that an incoming request came from a gateway before trusting its `Meta-*` headers. + +Notes +----- + +* The first request MUST contain an OPEN event as the first event. +* The first response MUST contain an OPEN event as the first event. +* If the server tracks connections and no longer considers the connection to exist, it should respond with DISCONNECT. In most cases, servers will not track connections, though. +* Gateway should only have one outstanding request per client connection. This ensures in-order delivery. +* DISCONNECT event only sent if connection was not closed cleanly. With clean close, disconnect is implied. +* Within this protocol alone, the server has no way to talk to the client outside of responding to incoming requests. +* Gateway can send an empty request to keep-alive the current connection. The gateway shall consider an empty response to be a keep-alive from the server. The server enables keep-alives by providing a Keep-Alive-Interval response header. + diff -Nru pushpin-0.0~20140620/.gitmodules pushpin-1.0.0/.gitmodules --- pushpin-0.0~20140620/.gitmodules 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/.gitmodules 1970-01-01 00:00:00.000000000 +0000 @@ -1,6 +0,0 @@ -[submodule "common"] - path = common - url = git://github.com/fanout/common.git -[submodule "qzmq"] - path = qzmq - url = git://github.com/jkarneges/qzmq.git diff -Nru pushpin-0.0~20140620/handler/pushpin-handler pushpin-1.0.0/handler/pushpin-handler --- pushpin-0.0~20140620/handler/pushpin-handler 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/handler/pushpin-handler 2014-09-15 17:04:41.000000000 +0000 @@ -21,6 +21,8 @@ import os import ConfigParser +version = '1.0.0' + config_file = "/etc/pushpin/pushpin.conf" log_file = None verbose = False @@ -29,8 +31,11 @@ config_file = arg[9:] elif arg.startswith("--logfile="): log_file = arg[10:] - elif arg.startswith("--verbose"): + elif arg == "--verbose": verbose = True + elif arg == "--version": + print 'pushpin-handler %s' % version + sys.exit(1) class ConfigWithInclude(object): def __init__(self, fname): @@ -230,6 +235,13 @@ def remove_from_response_channels(rid): remove_from_req_channels(rid, response=True, stream=False) +def headernames_contains(headers, name): + lname = name.lower() + for i in headers: + if i.lower() == lname: + return True + return False + def header_get(headers, name): lname = name.lower() if isinstance(headers, list): @@ -242,6 +254,27 @@ return v return None +# return list of strings +def header_get_all(headers, name): + lname = name.lower() + hvals = list() + if isinstance(headers, list): + for i in headers: + if i[0].lower() == lname: + hvals.append(i[1]) + else: + for k, v in headers.iteritems(): + if k.lower() == lname: + hvals.append(v) + out = list() + for hval in hvals: + parts = hval.split(',') + for p in parts: + p = p.strip() + if p: + out.append(p) + return out + def header_remove(headers, name): lname = name.lower() if isinstance(headers, list): @@ -266,6 +299,39 @@ return True return False +# return (initial value, params dict) +def _parse_header_params(value): + parts = value.split(';') + v = parts[0].strip() + params = dict() + for n in range(1, len(parts)): + part = parts[n].strip() + at = part.find('=') + if at != -1: + pname = part[:at] + pval = part[at + 1:] + else: + pname = part + pval = None + if pname: + params[pname] = pval + return (v, params) + +# return (initial value, params dict) or None +def header_get_parsed(headers, name): + h = header_get(headers, name) + if h is None: + return None + return _parse_header_params(h) + +# return list of (initial value, params dict) +def header_get_all_parsed(headers, name): + out = list() + hlist = header_get_all(headers, name) + for h in hlist: + out.append(_parse_header_params(h)) + return out + HTTP_FORMAT = "HTTP/1.1 %(code)s %(status)s\r\n%(headers)s\r\n\r\n%(body)s" HTTP_FORMAT_NOHEADERS = "HTTP/1.1 %(code)s %(status)s\r\n\r\n%(body)s" @@ -398,7 +464,7 @@ if not header_get(response_headers, "Access-Control-Allow-Headers"): acr_headers = header_get(request_headers, "Access-Control-Request-Headers") - allow_headers = list(); + allow_headers = list() if acr_headers: for name in acr_headers.split(","): name = name.strip() @@ -470,14 +536,9 @@ m = tnetstring.loads(m_raw) logger.debug("IN accept: %s" % m) + reqs = m["requests"] + try: - instruct = json.loads(m["response"]["body"]) - hold = instruct["hold"] - mode = hold.get("mode") - if mode is None: - mode = "response" - if mode != "response" and mode != "stream": - raise ValueError("bad mode") if "route" in m: route = m["route"] else: @@ -486,36 +547,84 @@ channel_prefix = m["channel-prefix"] else: channel_prefix = "" + + mode = None channels = list() - for hc in hold["channels"]: - name = channel_prefix + hc["name"] - prev_id = hc.get("prev-id") - channels.append((name, prev_id)) - if "timeout" in hold: - timeout = int(hold["timeout"]) - else: - timeout = 55 - response = instruct.get("response") - if response is None: - response = dict() - response["body"] = "" - if "headers" in response and isinstance(response["headers"], list): - d = dict() - for i in response["headers"]: - d[i[0]] = i[1] - response["headers"] = d - if "body-bin" in response: - response["body"] = b64decode(response["body-bin"]) - del response["body-bin"] - elif "body" in response: - response["body"] = response["body"].encode("utf-8") + timeout = 55 + resp_headers = m['response'].get('headers') or {} + + # headers-based grip + grip_hold = header_get_parsed(resp_headers, 'Grip-Hold') + if grip_hold: + mode = grip_hold[0] + grip_channels = header_get_all_parsed(resp_headers, 'Grip-Channel') + if grip_channels: + for c in grip_channels: + name = channel_prefix + c[0] + prev_id = c[1].get('prev-id') + channels.append((name, prev_id)) + grip_timeout = header_get_parsed(resp_headers, 'Grip-Timeout') + if grip_timeout: + timeout = int(grip_timeout[0]) + grip_expose_headers = header_get_all(resp_headers, 'Grip-Expose-Headers') + + instruct = None + ctype = header_get_parsed(resp_headers, 'Content-Type') + if ctype[0] == 'application/grip-instruct': + if m['response']['code'] != 200: + raise ValueError('response code for grip-instruct must be 200') + instruct = json.loads(m['response']['body']) + hold = instruct['hold'] + mode = hold.get('mode') + if mode is None: + mode = 'response' + for hc in hold['channels']: + name = channel_prefix + hc['name'] + prev_id = hc.get('prev-id') + channels.append((name, prev_id)) + if 'timeout' in hold: + timeout = int(hold['timeout']) + response = instruct.get('response') + if response is None: + response = dict() + response['body'] = '' + if "headers" in response and isinstance(response["headers"], list): + d = dict() + for i in response["headers"]: + d[i[0]] = i[1] + response["headers"] = d + if "body-bin" in response: + response["body"] = b64decode(response["body-bin"]) + del response["body-bin"] + elif "body" in response: + response["body"] = response["body"].encode("utf-8") + else: + response["body"] = "" else: - response["body"] = "" + response = m['response'] + if "headers" in response and isinstance(response["headers"], list): + d = dict() + for i in response["headers"]: + if i[0].lower().startswith('grip-'): + continue + if grip_expose_headers and not headernames_contains(grip_expose_headers, i[0]): + continue + d[i[0]] = i[1] + response["headers"] = d + if 'body' not in response: + response['body'] = '' + + if mode != 'response' and mode != 'stream': + raise ValueError('bad mode') except: logger.debug("failed to parse accept instructions") + for req in reqs: + rid = (req["rid"]["sender"], req["rid"]["id"]) + rheaders = dict() + rheaders['Content-Type'] = 'text/plain' + reply_http(out_sock, rid, 502, 'Bad Gateway', rheaders, 'Error while proxying to origin.\n') continue - reqs = m["requests"] logger.debug("accepting %d requests" % len(reqs)) for req in reqs: @@ -792,15 +901,35 @@ else: pbody = "" + grip_expose_headers = header_get_all(pheaders, 'Grip-Expose-Headers') + header_remove(pheaders, 'Grip-Expose-Headers') + for n, h in enumerate(holds): + # inherit any headers from the timeout response + if 'headers' in h.response: + rheaders = copy.deepcopy(h.response['headers']) + else: + rheaders = dict() + + # apply the headers from the pushed message + for k, v in pheaders.iteritems(): + rheaders[k] = v + + # if Grip-Expose-Headers was provided in the pushed message, filter the results + if grip_expose_headers: + rkeys = rheaders.keys() + for k in rkeys: + if not headernames_contains(grip_expose_headers, k): + del rheaders[k] + headers = dict() if h.jsonp_callback: result = dict() result["code"] = pcode result["reason"] = preason result["headers"] = dict() - if pheaders: - for k, v in pheaders.iteritems(): + if rheaders: + for k, v in rheaders.iteritems(): result["headers"][k] = v header_set(result["headers"], "Content-Length", str(len(pbody))) result["body"] = pbody @@ -810,8 +939,8 @@ header_set(headers, "Content-Length", str(len(body))) reply_http(out_sock, h.rid, 200, "OK", headers, body) else: - if pheaders: - for k, v in pheaders.iteritems(): + if rheaders: + for k, v in rheaders.iteritems(): headers[k] = v if h.auto_cross_origin: diff -Nru pushpin-0.0~20140620/proxy/src/app.cpp pushpin-1.0.0/proxy/src/app.cpp --- pushpin-0.0~20140620/proxy/src/app.cpp 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/app.cpp 2014-09-15 17:04:41.000000000 +0000 @@ -31,7 +31,7 @@ #include "xffrule.h" #include "engine.h" -#define VERSION "1.0" +#define VERSION "1.0.0" static void trimlist(QStringList *list) { diff -Nru pushpin-0.0~20140620/proxy/src/domainmap.cpp pushpin-1.0.0/proxy/src/domainmap.cpp --- pushpin-0.0~20140620/proxy/src/domainmap.cpp 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/domainmap.cpp 2014-09-15 17:04:41.000000000 +0000 @@ -399,6 +399,9 @@ if(props.contains("sub")) target.subChannel = props.value("sub"); + if(props.contains("over_http")) + target.overHttp = true; + r.targets += target; } diff -Nru pushpin-0.0~20140620/proxy/src/domainmap.h pushpin-1.0.0/proxy/src/domainmap.h --- pushpin-0.0~20140620/proxy/src/domainmap.h 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/domainmap.h 2014-09-15 17:04:41.000000000 +0000 @@ -45,12 +45,14 @@ bool insecure; // ignore server certificate validity QString host; // override input host QString subChannel; // force subscription for websocket test + bool overHttp; // use websocket-over-http protocol Target() : connectPort(-1), ssl(false), trusted(false), - insecure(false) + insecure(false), + overHttp(false) { } }; diff -Nru pushpin-0.0~20140620/proxy/src/pro/libpushpin-proxy/libpushpin-proxy.pri pushpin-1.0.0/proxy/src/pro/libpushpin-proxy/libpushpin-proxy.pri --- pushpin-0.0~20140620/proxy/src/pro/libpushpin-proxy/libpushpin-proxy.pri 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/pro/libpushpin-proxy/libpushpin-proxy.pri 2014-09-15 17:04:41.000000000 +0000 @@ -54,9 +54,11 @@ HEADERS += \ $$SRC_DIR/jwt.h \ + $$SRC_DIR/websocket.h \ $$SRC_DIR/zhttpmanager.h \ $$SRC_DIR/zhttprequest.h \ $$SRC_DIR/zwebsocket.h \ + $$SRC_DIR/websocketoverhttp.h \ $$SRC_DIR/inspectdata.h \ $$SRC_DIR/inspectmanager.h \ $$SRC_DIR/inspectrequest.h \ @@ -80,6 +82,7 @@ $$SRC_DIR/zhttpmanager.cpp \ $$SRC_DIR/zhttprequest.cpp \ $$SRC_DIR/zwebsocket.cpp \ + $$SRC_DIR/websocketoverhttp.cpp \ $$SRC_DIR/inspectmanager.cpp \ $$SRC_DIR/inspectrequest.cpp \ $$SRC_DIR/wscontrolmanager.cpp \ diff -Nru pushpin-0.0~20140620/proxy/src/proxysession.cpp pushpin-1.0.0/proxy/src/proxysession.cpp --- pushpin-0.0~20140620/proxy/src/proxysession.cpp 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/proxysession.cpp 2014-09-15 17:04:41.000000000 +0000 @@ -536,7 +536,7 @@ if(at != -1) contentType = contentType.mid(0, at); - if(!passToUpstream && acceptTypes.contains(contentType)) + if(!passToUpstream && (responseData.headers.contains("Grip-Hold") || acceptTypes.contains(contentType))) { if(!buffering) { diff -Nru pushpin-0.0~20140620/proxy/src/websocket.h pushpin-1.0.0/proxy/src/websocket.h --- pushpin-0.0~20140620/proxy/src/websocket.h 1970-01-01 00:00:00.000000000 +0000 +++ pushpin-1.0.0/proxy/src/websocket.h 2014-09-15 17:04:41.000000000 +0000 @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2014 Fanout, Inc. + * + * This file is part of Pushpin. + * + * Pushpin is free software: you can redistribute it and/or modify it under + * the terms of the GNU Affero General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) + * any later version. + * + * Pushpin is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + * more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef WEBSOCKET_H +#define WEBSOCKET_H + +#include +#include +#include +#include "httpheaders.h" + +class WebSocket : public QObject +{ + Q_OBJECT + +public: + enum State + { + Idle, + Connecting, + Connected, + Closing + }; + + enum ErrorCondition + { + ErrorGeneric, + ErrorPolicy, + ErrorConnect, + ErrorConnectTimeout, + ErrorTls, + ErrorRejected, + ErrorTimeout, + ErrorUnavailable + }; + + class Frame + { + public: + enum Type + { + Continuation, + Text, + Binary, + Ping, + Pong + }; + + Type type; + QByteArray data; + bool more; + + Frame(Type _type, const QByteArray &_data, bool _more) : + type(_type), + data(_data), + more(_more) + { + } + }; + + WebSocket(QObject *parent = 0) : QObject(parent) {} + + virtual QHostAddress peerAddress() const = 0; + + virtual void setConnectHost(const QString &host) = 0; + virtual void setConnectPort(int port) = 0; + virtual void setIgnorePolicies(bool on) = 0; + virtual void setIgnoreTlsErrors(bool on) = 0; + + virtual void start(const QUrl &uri, const HttpHeaders &headers) = 0; + + virtual void respondSuccess(const QByteArray &reason, const HttpHeaders &headers) = 0; + virtual void respondError(int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) = 0; + + virtual State state() const = 0; + virtual QUrl requestUri() const = 0; + virtual HttpHeaders requestHeaders() const = 0; + virtual int responseCode() const = 0; + virtual QByteArray responseReason() const = 0; + virtual HttpHeaders responseHeaders() const = 0; + virtual QByteArray responseBody() const = 0; + virtual int framesAvailable() const = 0; + virtual bool canWrite() const = 0; + virtual int writeBytesAvailable() const = 0; + virtual int peerCloseCode() const = 0; + virtual ErrorCondition errorCondition() const = 0; + + virtual void writeFrame(const Frame &frame) = 0; + virtual Frame readFrame() = 0; + virtual void close(int code = -1) = 0; + +signals: + void connected(); + void readyRead(); + void framesWritten(int count, int contentBytes); + void peerClosed(); // emitted only if peer closes before we do + void closed(); // emitted after peer acks our close, or immediately if we were acking + void error(); +}; + +#endif diff -Nru pushpin-0.0~20140620/proxy/src/websocketoverhttp.cpp pushpin-1.0.0/proxy/src/websocketoverhttp.cpp --- pushpin-0.0~20140620/proxy/src/websocketoverhttp.cpp 1970-01-01 00:00:00.000000000 +0000 +++ pushpin-1.0.0/proxy/src/websocketoverhttp.cpp 2014-09-15 17:04:41.000000000 +0000 @@ -0,0 +1,709 @@ +/* + * Copyright (C) 2014 Fanout, Inc. + * + * This file is part of Pushpin. + * + * Pushpin is free software: you can redistribute it and/or modify it under + * the terms of the GNU Affero General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) + * any later version. + * + * Pushpin is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + * more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "websocketoverhttp.h" + +#include +#include +#include +#include +#include "log.h" +#include "bufferlist.h" +#include "packet/httprequestdata.h" +#include "packet/httpresponsedata.h" +#include "zhttprequest.h" +#include "zhttpmanager.h" + +#define BUFFER_SIZE 200000 + +class WsEvent +{ +public: + QByteArray type; + QByteArray content; + + WsEvent() + { + } + + WsEvent(const QByteArray &_type, const QByteArray &_content = QByteArray()) : + type(_type), + content(_content) + { + } +}; + +static QList decodeEvents(const QByteArray &in, bool *ok = 0) +{ + QList out; + if(ok) + *ok = false; + + int start = 0; + while(start < in.size()) + { + int at = in.indexOf("\r\n", start); + if(at == -1) + return QList(); + + QByteArray typeLine = in.mid(start, at - start); + start = at + 2; + + WsEvent e; + at = typeLine.indexOf(' '); + if(at != -1) + { + e.type = typeLine.mid(0, at); + + bool check; + int clen = typeLine.mid(at + 1).toInt(&check, 16); + if(!check) + return QList(); + + e.content = in.mid(start, clen); + start += clen + 2; + } + else + { + e.type = typeLine; + } + + out += e; + } + + if(ok) + *ok = true; + return out; +} + +static QByteArray encodeEvents(const QList &events) +{ + QByteArray out; + + foreach(const WsEvent &e, events) + { + if(!e.content.isNull()) + { + out += e.type + ' ' + QByteArray::number(e.content.size(), 16) + "\r\n" + e.content + "\r\n"; + } + else + { + out += e.type + "\r\n"; + } + } + + return out; +} + +class WebSocketOverHttp::Private : public QObject +{ + Q_OBJECT + +public: + WebSocketOverHttp *q; + ZhttpManager *zhttpManager; + QString connectHost; + int connectPort; + bool ignorePolicies; + bool ignoreTlsErrors; + State state; + QByteArray cid; + HttpRequestData requestData; + HttpResponseData responseData; + ErrorCondition errorCondition; + int keepAliveInterval; + HttpHeaders meta; + bool updating; + ZhttpRequest *req; + int reqFrames; + int reqContentSize; + bool reqClose; + BufferList inBuf; + QList inFrames; + QList outFrames; + int closeCode; + bool closeSent; + bool peerClosing; + int peerCloseCode; + QTimer *keepAliveTimer; + + Private(WebSocketOverHttp *_q) : + QObject(_q), + q(_q), + connectPort(-1), + ignorePolicies(false), + ignoreTlsErrors(false), + state(WebSocket::Idle), + errorCondition(WebSocket::ErrorGeneric), + keepAliveInterval(-1), + updating(false), + req(0), + reqFrames(0), + reqContentSize(0), + reqClose(false), + closeCode(-1), + closeSent(false), + peerClosing(false), + peerCloseCode(-1) + { + keepAliveTimer = new QTimer(this); + connect(keepAliveTimer, SIGNAL(timeout()), SLOT(keepAliveTimer_timeout())); + keepAliveTimer->setSingleShot(true); + } + + ~Private() + { + keepAliveTimer->disconnect(this); + keepAliveTimer->setParent(0); + keepAliveTimer->deleteLater(); + } + + void start() + { + state = Connecting; + + cid = QUuid::createUuid().toString().toLatin1(); + + // don't forward certain headers + requestData.headers.removeAll("Upgrade"); + requestData.headers.removeAll("Accept"); + requestData.headers.removeAll("Connection-Id"); + + // don't forward headers starting with Meta-* + for(int n = 0; n < requestData.headers.count(); ++n) + { + const HttpHeader &h = requestData.headers[n]; + if(qstrnicmp(h.first.data(), "Meta-", 5) == 0) + { + requestData.headers.removeAt(n); + --n; // adjust position + } + } + + if(requestData.uri.scheme() == "wss") + requestData.uri.setScheme("https"); + else + requestData.uri.setScheme("http"); + + update(); + } + + void writeFrame(const Frame &frame) + { + assert(state != Closing); + + outFrames += frame; + + update(); + } + + Frame readFrame() + { + return inFrames.takeFirst(); + } + + void close(int code) + { + assert(state != Closing); + + state = Closing; + closeCode = code; + + update(); + } + +private: + void update() + { + // only one request allowed at a time + if(updating) + return; + + updating = true; + + keepAliveTimer->stop(); + + req = zhttpManager->createRequest(); + req->setParent(this); + connect(req, SIGNAL(readyRead()), SLOT(req_readyRead())); + connect(req, SIGNAL(bytesWritten(int)), SLOT(req_bytesWritten(int))); + connect(req, SIGNAL(error()), SLOT(req_error())); + + if(!connectHost.isEmpty()) + req->setConnectHost(connectHost); + if(connectPort != -1) + req->setConnectPort(connectPort); + req->setIgnorePolicies(ignorePolicies); + req->setIgnoreTlsErrors(ignoreTlsErrors); + + HttpHeaders headers = requestData.headers; + + headers += HttpHeader("Accept", "application/websocket-events"); + headers += HttpHeader("Connection-Id", cid); + + foreach(const HttpHeader &h, meta) + headers += HttpHeader("Meta-" + h.first, h.second); + + req->start("POST", requestData.uri, headers); + + reqFrames = 0; + reqContentSize = 0; + reqClose = false; + + QList events; + + if(state == Connecting) + { + events += WsEvent("OPEN"); + } + else + { + while(!outFrames.isEmpty()) + { + Frame f = outFrames.takeFirst(); + if(f.type == Frame::Text) + events += WsEvent("TEXT", f.data); + else if(f.type == Frame::Binary) + events += WsEvent("BINARY", f.data); + else if(f.type == Frame::Ping) + events += WsEvent("PING"); + else if(f.type == Frame::Pong) + events += WsEvent("PONG"); + + ++reqFrames; + reqContentSize += f.data.size(); + } + + if(state == Closing) + { + QByteArray buf(2, 0); + buf[0] = (closeCode >> 8) & 0xff; + buf[1] = closeCode & 0xff; + events += WsEvent("CLOSE", buf); + + reqClose = true; + } + } + + if(!events.isEmpty()) + req->writeBody(encodeEvents(events)); + + req->endBody(); + } + +private slots: + void req_readyRead() + { + inBuf += req->readBody(); + + if(!req->isFinished()) + { + updating = false; + return; + } + + int responseCode = req->responseCode(); + QByteArray responseReason = req->responseReason(); + HttpHeaders responseHeaders = req->responseHeaders(); + QByteArray responseBody = req->readBody(); + + delete req; + req = 0; + + if(responseCode != 200) + { + updating = false; + + emit q->error(); + return; + } + + QByteArray contentType = responseHeaders.get("Content-Type"); + if(contentType != "application/websocket-events") + { + updating = false; + + emit q->error(); + return; + } + + if(responseHeaders.contains("Keep-Alive-Interval")) + { + bool ok; + int x = responseHeaders.get("Keep-Alive-Interval").toInt(&ok); + if(ok && x > 0) + { + if(x < 20) + x = 20; + + keepAliveInterval = x; + } + else + keepAliveInterval = -1; + } + + foreach(const HttpHeader &h, responseHeaders) + { + if(h.first.size() >= 10 && qstrnicmp(h.first.data(), "Set-Meta-", 9) == 0) + { + QByteArray name = h.first.mid(9); + if(meta.contains(name)) + meta.removeAll(name); + meta += HttpHeader(name, h.second); + } + } + + bool ok; + QList events = decodeEvents(inBuf.take(), &ok); + if(!ok) + { + updating = false; + + emit q->error(); + return; + } + + if(state == Connecting) + { + // server must respond with events or enable keep alive + if(events.isEmpty() && keepAliveInterval == -1) + { + updating = false; + + emit q->error(); + return; + } + + // first event must be OPEN + if(!events.isEmpty() && events.first().type != "OPEN") + { + updating = false; + + emit q->error(); + return; + } + } + + QPointer self = this; + + bool emitConnected = false; + bool emitReadyRead = false; + bool closed = false; + bool disconnected = false; + + foreach(const WsEvent &e, events) + { + if(e.type == "OPEN") + { + if(state != Connecting) + { + disconnected = true; + break; + } + + // save the initial response + responseData.code = responseCode; + responseData.reason = responseReason; + responseData.headers = responseHeaders; + responseData.body = responseBody; + + state = Connected; + emitConnected = true; + } + else if(e.type == "TEXT") + { + inFrames += Frame(Frame::Text, e.content, false); + emitReadyRead = true; + } + else if(e.type == "BINARY") + { + inFrames += Frame(Frame::Binary, e.content, false); + emitReadyRead = true; + } + else if(e.type == "PING") + { + inFrames += Frame(Frame::Ping, QByteArray(), false); + emitReadyRead = true; + } + else if(e.type == "PONG") + { + inFrames += Frame(Frame::Pong, QByteArray(), false); + emitReadyRead = true; + } + else if(e.type == "CLOSE") + { + peerClosing = true; + if(e.content.size() == 2) + peerCloseCode = ((quint16)e.content[0] << 8) + (quint16)e.content[1]; + + closed = true; + break; + } + else if(e.type == "DISCONNECT") + { + disconnected = true; + break; + } + } + + if(emitConnected) + { + emit q->connected(); + if(!self) + return; + } + + if(emitReadyRead) + { + emit q->readyRead(); + if(!self) + return; + } + + if(reqFrames > 0) + { + emit q->framesWritten(reqFrames, reqContentSize); + if(!self) + return; + } + + if(reqClose) + closeSent = true; + + if(closed) + { + if(closeSent) + { + updating = false; + + state = Idle; + emit q->closed(); + return; + } + else + { + emit q->peerClosed(); + } + } + else if(closeSent && keepAliveInterval == -1) + { + // if there are no keep alives, then the server has only one + // chance to respond to a close. if it doesn't, then + // consider the connection uncleanly disconnected. + disconnected = true; + } + + if(disconnected) + { + updating = false; + + emit q->error(); + return; + } + + if(reqClose && peerClosing) + { + updating = false; + + state = Idle; + emit q->closed(); + return; + } + + updating = false; + + if(!outFrames.isEmpty() || (state == Closing && !closeSent)) + update(); + else if(keepAliveInterval != -1) + keepAliveTimer->start(keepAliveInterval * 1000); + } + + void req_bytesWritten(int count) + { + Q_UNUSED(count); + + // nothing to do here + } + + void req_error() + { + delete req; + req = 0; + + emit q->error(); + } + + void keepAliveTimer_timeout() + { + update(); + } +}; + +WebSocketOverHttp::WebSocketOverHttp(ZhttpManager *zhttpManager, QObject *parent) : + WebSocket(parent) +{ + d = new Private(this); + d->zhttpManager = zhttpManager; +} + +WebSocketOverHttp::~WebSocketOverHttp() +{ + delete d; +} + +QHostAddress WebSocketOverHttp::peerAddress() const +{ + // this class is client only + return QHostAddress(); +} + +void WebSocketOverHttp::setConnectHost(const QString &host) +{ + d->connectHost = host; +} + +void WebSocketOverHttp::setConnectPort(int port) +{ + d->connectPort = port; +} + +void WebSocketOverHttp::setIgnorePolicies(bool on) +{ + d->ignorePolicies = on; +} + +void WebSocketOverHttp::setIgnoreTlsErrors(bool on) +{ + d->ignoreTlsErrors = on; +} + +void WebSocketOverHttp::start(const QUrl &uri, const HttpHeaders &headers) +{ + assert(d->state == Idle); + + d->requestData.uri = uri; + d->requestData.headers = headers; + d->start(); +} + +void WebSocketOverHttp::respondSuccess(const QByteArray &reason, const HttpHeaders &headers) +{ + Q_UNUSED(reason); + Q_UNUSED(headers); + + // this class is client only + assert(0); +} + +void WebSocketOverHttp::respondError(int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body) +{ + Q_UNUSED(code); + Q_UNUSED(reason); + Q_UNUSED(headers); + Q_UNUSED(body); + + // this class is client only + assert(0); +} + +WebSocket::State WebSocketOverHttp::state() const +{ + return d->state; +} + +QUrl WebSocketOverHttp::requestUri() const +{ + return d->requestData.uri; +} + +HttpHeaders WebSocketOverHttp::requestHeaders() const +{ + return d->requestData.headers; +} + +int WebSocketOverHttp::responseCode() const +{ + return d->responseData.code; +} + +QByteArray WebSocketOverHttp::responseReason() const +{ + return d->responseData.reason; +} + +HttpHeaders WebSocketOverHttp::responseHeaders() const +{ + return d->responseData.headers; +} + +QByteArray WebSocketOverHttp::responseBody() const +{ + return d->responseData.body; +} + +int WebSocketOverHttp::framesAvailable() const +{ + return d->inFrames.count(); +} + +bool WebSocketOverHttp::canWrite() const +{ + return (writeBytesAvailable() > 0); +} + +int WebSocketOverHttp::writeBytesAvailable() const +{ + int avail = BUFFER_SIZE; + foreach(const Frame &f, d->outFrames) + { + if(f.data.size() >= avail) + return 0; + + avail -= f.data.size(); + } + + return avail; +} + +int WebSocketOverHttp::peerCloseCode() const +{ + return d->peerCloseCode; +} + +WebSocket::ErrorCondition WebSocketOverHttp::errorCondition() const +{ + return d->errorCondition; +} + +void WebSocketOverHttp::writeFrame(const Frame &frame) +{ + d->writeFrame(frame); +} + +WebSocket::Frame WebSocketOverHttp::readFrame() +{ + return d->readFrame(); +} + +void WebSocketOverHttp::close(int code) +{ + d->close(code); +} + +#include "websocketoverhttp.moc" diff -Nru pushpin-0.0~20140620/proxy/src/websocketoverhttp.h pushpin-1.0.0/proxy/src/websocketoverhttp.h --- pushpin-0.0~20140620/proxy/src/websocketoverhttp.h 1970-01-01 00:00:00.000000000 +0000 +++ pushpin-1.0.0/proxy/src/websocketoverhttp.h 2014-09-15 17:04:41.000000000 +0000 @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2014 Fanout, Inc. + * + * This file is part of Pushpin. + * + * Pushpin is free software: you can redistribute it and/or modify it under + * the terms of the GNU Affero General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) + * any later version. + * + * Pushpin is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for + * more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef WEBSOCKETOVERHTTP_H +#define WEBSOCKETOVERHTTP_H + +#include +#include +#include +#include "httpheaders.h" +#include "websocket.h" + +class ZhttpManager; + +class WebSocketOverHttp : public WebSocket +{ + Q_OBJECT + +public: + WebSocketOverHttp(ZhttpManager *zhttpManager, QObject *parent = 0); + ~WebSocketOverHttp(); + + // reimplemented + + virtual QHostAddress peerAddress() const; + + virtual void setConnectHost(const QString &host); + virtual void setConnectPort(int port); + virtual void setIgnorePolicies(bool on); + virtual void setIgnoreTlsErrors(bool on); + + virtual void start(const QUrl &uri, const HttpHeaders &headers); + + virtual void respondSuccess(const QByteArray &reason, const HttpHeaders &headers); + virtual void respondError(int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body); + + virtual State state() const; + virtual QUrl requestUri() const; + virtual HttpHeaders requestHeaders() const; + virtual int responseCode() const; + virtual QByteArray responseReason() const; + virtual HttpHeaders responseHeaders() const; + virtual QByteArray responseBody() const; + virtual int framesAvailable() const; + virtual bool canWrite() const; + virtual int writeBytesAvailable() const; + virtual int peerCloseCode() const; + virtual ErrorCondition errorCondition() const; + + virtual void writeFrame(const Frame &frame); + virtual Frame readFrame(); + virtual void close(int code = -1); + +signals: + void connected(); + void readyRead(); + void framesWritten(int count, int contentBytes); + void peerClosed(); + void closed(); + void error(); + +private: + class Private; + friend class Private; + Private *d; +}; + +#endif diff -Nru pushpin-0.0~20140620/proxy/src/wsproxysession.cpp pushpin-1.0.0/proxy/src/wsproxysession.cpp --- pushpin-0.0~20140620/proxy/src/wsproxysession.cpp 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/wsproxysession.cpp 2014-09-15 17:04:41.000000000 +0000 @@ -28,6 +28,7 @@ #include "log.h" #include "zhttpmanager.h" #include "zwebsocket.h" +#include "websocketoverhttp.h" #include "domainmap.h" #include "wscontrolmanager.h" #include "wscontrolsession.h" @@ -231,7 +232,7 @@ HttpRequestData requestData; ZWebSocket::Rid rid; ZWebSocket *inSock; - ZWebSocket *outSock; + WebSocket *outSock; int inPendingBytes; int outPendingBytes; int outReadInProgress; // frame type or -1 @@ -385,8 +386,16 @@ log_debug("wsproxysession: %p forwarding to %s:%d", q, qPrintable(target.connectHost), target.connectPort); - outSock = zhttpManager->createSocket(); - outSock->setParent(this); + if(target.overHttp) + { + outSock = new WebSocketOverHttp(zhttpManager, this); + } + else + { + outSock = zhttpManager->createSocket(); + outSock->setParent(this); + } + connect(outSock, SIGNAL(connected()), SLOT(out_connected())); connect(outSock, SIGNAL(readyRead()), SLOT(out_readyRead())); connect(outSock, SIGNAL(framesWritten(int, int)), SLOT(out_framesWritten(int, int))); @@ -421,9 +430,9 @@ void tryReadIn() { - while(inSock->framesAvailable() > 0 && outSock->canWrite()) + while(inSock->framesAvailable() > 0 && ((outSock && outSock->canWrite()) || detached)) { - ZWebSocket::Frame f = inSock->readFrame(); + WebSocket::Frame f = inSock->readFrame(); tryLogActivity(); @@ -437,27 +446,27 @@ void tryReadOut() { - while(outSock->framesAvailable() > 0 && inSock->canWrite()) + while(outSock->framesAvailable() > 0 && ((inSock && inSock->canWrite()) || detached)) { - ZWebSocket::Frame f = outSock->readFrame(); + WebSocket::Frame f = outSock->readFrame(); tryLogActivity(); if(detached) continue; - if(f.type == ZWebSocket::Frame::Text || f.type == ZWebSocket::Frame::Binary || f.type == ZWebSocket::Frame::Continuation) + if(f.type == WebSocket::Frame::Text || f.type == WebSocket::Frame::Binary || f.type == WebSocket::Frame::Continuation) { // we are skipping the rest of this message - if(f.type == ZWebSocket::Frame::Continuation && outReadInProgress == -1) + if(f.type == WebSocket::Frame::Continuation && outReadInProgress == -1) continue; - if(f.type != ZWebSocket::Frame::Continuation) + if(f.type != WebSocket::Frame::Continuation) outReadInProgress = (int)f.type; if(wsControl && acceptGripMessages) { - if(f.type == ZWebSocket::Frame::Text && f.data.startsWith("c:")) + if(f.type == WebSocket::Frame::Text && f.data.startsWith("c:")) { // grip messages must only be one frame if(!f.more) @@ -465,13 +474,13 @@ else outReadInProgress = -1; // ignore rest of message } - else if(f.type != ZWebSocket::Frame::Continuation && f.data.startsWith(messagePrefix)) + else if(f.type != WebSocket::Frame::Continuation && f.data.startsWith(messagePrefix)) { f.data = f.data.mid(messagePrefix.size()); inSock->writeFrame(f); inPendingBytes += f.data.size(); } - else if(f.type == ZWebSocket::Frame::Continuation) + else if(f.type == WebSocket::Frame::Continuation) { assert(outReadInProgress != -1); @@ -519,7 +528,7 @@ private slots: void in_readyRead() { - if(!detached && outSock && outSock->state() == ZWebSocket::Connected) + if((outSock && outSock->state() == WebSocket::Connected) || detached) tryReadIn(); } @@ -535,8 +544,15 @@ void in_peerClosed() { - if(!detached && outSock && outSock->state() != ZWebSocket::Closing) - outSock->close(); + if(detached) + { + inSock->close(); + } + else + { + if(outSock && outSock->state() != WebSocket::Closing) + outSock->close(); + } } void in_closed() @@ -544,7 +560,7 @@ delete inSock; inSock = 0; - if(!detached && outSock && outSock->state() != ZWebSocket::Closing) + if(!detached && outSock && outSock->state() != WebSocket::Closing) outSock->close(); tryFinish(); @@ -632,7 +648,7 @@ void out_peerClosed() { - if(!detached && inSock && inSock->state() != ZWebSocket::Closing) + if(!detached && inSock && inSock->state() != WebSocket::Closing) inSock->close(); } @@ -641,7 +657,7 @@ delete outSock; outSock = 0; - if(!detached && inSock && inSock->state() != ZWebSocket::Closing) + if(!detached && inSock && inSock->state() != WebSocket::Closing) inSock->close(); tryFinish(); @@ -649,7 +665,7 @@ void out_error() { - ZWebSocket::ErrorCondition e = outSock->errorCondition(); + WebSocket::ErrorCondition e = outSock->errorCondition(); log_debug("wsproxysession: %p target error state=%d, condition=%d", q, (int)state, (int)e); if(detached) @@ -667,12 +683,12 @@ switch(e) { - case ZWebSocket::ErrorConnect: - case ZWebSocket::ErrorConnectTimeout: - case ZWebSocket::ErrorTls: + case WebSocket::ErrorConnect: + case WebSocket::ErrorConnectTimeout: + case WebSocket::ErrorTls: tryAgain = true; break; - case ZWebSocket::ErrorRejected: + case WebSocket::ErrorRejected: reject(outSock->responseCode(), outSock->responseReason(), outSock->responseHeaders(), outSock->responseBody()); break; default: @@ -700,12 +716,12 @@ void wsControl_sendEventReceived(const QByteArray &contentType, const QByteArray &message) { // only send if we can, otherwise drop - if(inSock && inSock->state() != ZWebSocket::Closing && inSock->canWrite()) + if(inSock && inSock->state() != WebSocket::Closing && inSock->canWrite()) { if(contentType == "binary") - inSock->writeFrame(ZWebSocket::Frame(ZWebSocket::Frame::Binary, message, false)); + inSock->writeFrame(WebSocket::Frame(WebSocket::Frame::Binary, message, false)); else - inSock->writeFrame(ZWebSocket::Frame(ZWebSocket::Frame::Text, message, false)); + inSock->writeFrame(WebSocket::Frame(WebSocket::Frame::Text, message, false)); inPendingBytes += message.size(); } @@ -719,7 +735,7 @@ detached = true; - if(outSock && outSock->state() != ZWebSocket::Closing) + if(outSock && outSock->state() != WebSocket::Closing) outSock->close(); } diff -Nru pushpin-0.0~20140620/proxy/src/zwebsocket.cpp pushpin-1.0.0/proxy/src/zwebsocket.cpp --- pushpin-0.0~20140620/proxy/src/zwebsocket.cpp 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/zwebsocket.cpp 2014-09-15 17:04:42.000000000 +0000 @@ -73,7 +73,7 @@ int peerCloseCode; QVariant userData; bool pendingUpdate; - ZWebSocket::ErrorCondition errorCondition; + WebSocket::ErrorCondition errorCondition; QTimer *expireTimer; QTimer *keepAliveTimer; QList inFrames; @@ -870,7 +870,7 @@ }; ZWebSocket::ZWebSocket(QObject *parent) : - QObject(parent) + WebSocket(parent) { d = new Private(this); } @@ -942,7 +942,7 @@ d->reject(); } -ZWebSocket::State ZWebSocket::state() const +WebSocket::State ZWebSocket::state() const { switch(d->state) { @@ -1024,7 +1024,7 @@ return d->peerCloseCode; } -ZWebSocket::ErrorCondition ZWebSocket::errorCondition() const +WebSocket::ErrorCondition ZWebSocket::errorCondition() const { return d->errorCondition; } @@ -1034,7 +1034,7 @@ d->writeFrame(frame); } -ZWebSocket::Frame ZWebSocket::readFrame() +WebSocket::Frame ZWebSocket::readFrame() { return d->readFrame(); } diff -Nru pushpin-0.0~20140620/proxy/src/zwebsocket.h pushpin-1.0.0/proxy/src/zwebsocket.h --- pushpin-0.0~20140620/proxy/src/zwebsocket.h 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/proxy/src/zwebsocket.h 2014-09-15 17:04:42.000000000 +0000 @@ -23,62 +23,18 @@ #include #include #include -#include #include "httpheaders.h" +#include "websocket.h" class ZhttpRequestPacket; class ZhttpResponsePacket; class ZhttpManager; -class ZWebSocket : public QObject +class ZWebSocket : public WebSocket { Q_OBJECT public: - enum State - { - Idle, - Connecting, - Connected, - Closing - }; - - enum ErrorCondition - { - ErrorGeneric, - ErrorPolicy, - ErrorConnect, - ErrorConnectTimeout, - ErrorTls, - ErrorRejected, - ErrorTimeout, - ErrorUnavailable - }; - - class Frame - { - public: - enum Type - { - Continuation, - Text, - Binary, - Ping, - Pong - }; - - Type type; - QByteArray data; - bool more; - - Frame(Type _type, const QByteArray &_data, bool _more) : - type(_type), - data(_data), - more(_more) - { - } - }; - // pair of sender + request id typedef QPair Rid; @@ -86,41 +42,43 @@ Rid rid() const; - QHostAddress peerAddress() const; + // reimplemented + + virtual QHostAddress peerAddress() const; - void setConnectHost(const QString &host); - void setConnectPort(int port); - void setIgnorePolicies(bool on); - void setIgnoreTlsErrors(bool on); - - void start(const QUrl &uri, const HttpHeaders &headers); - - void respondSuccess(const QByteArray &reason, const HttpHeaders &headers); - void respondError(int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body); - - State state() const; - QUrl requestUri() const; - HttpHeaders requestHeaders() const; - int responseCode() const; - QByteArray responseReason() const; - HttpHeaders responseHeaders() const; - QByteArray responseBody() const; - int framesAvailable() const; - bool canWrite() const; - int writeBytesAvailable() const; - int peerCloseCode() const; - ErrorCondition errorCondition() const; - - void writeFrame(const Frame &frame); - Frame readFrame(); - void close(int code = -1); + virtual void setConnectHost(const QString &host); + virtual void setConnectPort(int port); + virtual void setIgnorePolicies(bool on); + virtual void setIgnoreTlsErrors(bool on); + + virtual void start(const QUrl &uri, const HttpHeaders &headers); + + virtual void respondSuccess(const QByteArray &reason, const HttpHeaders &headers); + virtual void respondError(int code, const QByteArray &reason, const HttpHeaders &headers, const QByteArray &body); + + virtual State state() const; + virtual QUrl requestUri() const; + virtual HttpHeaders requestHeaders() const; + virtual int responseCode() const; + virtual QByteArray responseReason() const; + virtual HttpHeaders responseHeaders() const; + virtual QByteArray responseBody() const; + virtual int framesAvailable() const; + virtual bool canWrite() const; + virtual int writeBytesAvailable() const; + virtual int peerCloseCode() const; + virtual ErrorCondition errorCondition() const; + + virtual void writeFrame(const Frame &frame); + virtual Frame readFrame(); + virtual void close(int code = -1); signals: void connected(); void readyRead(); void framesWritten(int count, int contentBytes); - void peerClosed(); // emitted only if peer closes before we do - void closed(); // emitted after peer acks our close, or immediately if we were acking + void peerClosed(); + void closed(); void error(); private: diff -Nru pushpin-0.0~20140620/pushpin pushpin-1.0.0/pushpin --- pushpin-0.0~20140620/pushpin 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/pushpin 2014-09-15 17:04:42.000000000 +0000 @@ -1,5 +1,6 @@ #!/usr/bin/env python default_config_dir = "/etc/pushpin" +version = '1.0.0' import sys import os @@ -19,6 +20,9 @@ break elif arg == "--verbose": verbose = True + elif arg == "--version": + print 'pushpin %s' % version + sys.exit(0) # config file in same dir as executable config_file_list.append(os.path.join(exedir, "pushpin.conf")) diff -Nru pushpin-0.0~20140620/README.md pushpin-1.0.0/README.md --- pushpin-0.0~20140620/README.md 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/README.md 2014-09-15 17:06:10.000000000 +0000 @@ -1,9 +1,6 @@ Pushpin -------- -Date: June 20th, 2014 - -Author: Justin Karneges - +======= +Author: Justin Karneges Mailing List: http://lists.fanout.io/listinfo.cgi/fanout-users-fanout.io Read: diff -Nru pushpin-0.0~20140620/tools/publish_curl.sh pushpin-1.0.0/tools/publish_curl.sh --- pushpin-0.0~20140620/tools/publish_curl.sh 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/tools/publish_curl.sh 2014-09-15 17:04:42.000000000 +0000 @@ -9,4 +9,4 @@ channel=$1 content=$2 -curl -d "{ \"items\": [ { \"channel\": \"$channel\", \"http-response\": { \"body\": \"$content\\n\" }, \"http-stream\": {\"content\": \"$content\\n\" } } ] }" http://localhost:5561/publish/ +curl -d "{ \"items\": [ { \"channel\": \"$channel\", \"http-response\": { \"body\": \"$content\\n\" }, \"http-stream\": {\"content\": \"$content\\n\" }, \"ws-message\": {\"content\": \"$content\\n\" } } ] }" http://localhost:5561/publish/ diff -Nru pushpin-0.0~20140620/tools/publish.py pushpin-1.0.0/tools/publish.py --- pushpin-0.0~20140620/tools/publish.py 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/tools/publish.py 2014-09-15 17:04:42.000000000 +0000 @@ -3,7 +3,7 @@ import zmq if len(sys.argv) < 3: - print "usage: %s [channel] [content]" % sys.argv[0] + print 'usage: %s [channel] [content]' % sys.argv[0] sys.exit(1) channel = sys.argv[1] @@ -11,17 +11,20 @@ ctx = zmq.Context() sock = ctx.socket(zmq.PUSH) -sock.connect("tcp://127.0.0.1:5560") +sock.connect('tcp://localhost:5560') hr = dict() -hr["body"] = content + "\n" +hr['body'] = content + '\n' hs = dict() -hs["content"] = content + "\n" +hs['content'] = content + '\n' +ws = dict() +ws['content'] = content + '\n' item = dict() -item["channel"] = channel -item["http-response"] = hr -item["http-stream"] = hs +item['channel'] = channel +item['http-response'] = hr +item['http-stream'] = hs +item['ws-message'] = ws sock.send(tnetstring.dumps(item)) -print "Published" +print 'Published' diff -Nru pushpin-0.0~20140620/tools/testgripresponse_json.php pushpin-1.0.0/tools/testgripresponse_json.php --- pushpin-0.0~20140620/tools/testgripresponse_json.php 1970-01-01 00:00:00.000000000 +0000 +++ pushpin-1.0.0/tools/testgripresponse_json.php 2014-09-15 17:04:42.000000000 +0000 @@ -0,0 +1,17 @@ + +{ + "hold": { + "mode": "response", + "channels": [ + { + "name": "test" + } + ] + }, + "response": { + "headers": { + "Content-Type": "text/plain" + }, + "body": "nothing for now\n" + } +} diff -Nru pushpin-0.0~20140620/tools/testgripresponse.php pushpin-1.0.0/tools/testgripresponse.php --- pushpin-0.0~20140620/tools/testgripresponse.php 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/tools/testgripresponse.php 2014-09-15 17:04:42.000000000 +0000 @@ -1,17 +1,4 @@ - -{ - "hold": { - "mode": "response", - "channels": [ - { - "name": "test" - } - ] - }, - "response": { - "headers": { - "Content-Type": "text/plain" - }, - "body": "nothing for now\n" - } -} + +nothing for now diff -Nru pushpin-0.0~20140620/tools/testgripstream.php pushpin-1.0.0/tools/testgripstream.php --- pushpin-0.0~20140620/tools/testgripstream.php 2014-06-25 13:18:48.000000000 +0000 +++ pushpin-1.0.0/tools/testgripstream.php 2014-09-15 17:04:42.000000000 +0000 @@ -1,17 +1,4 @@ - -{ - "hold": { - "mode": "stream", - "channels": [ - { - "name": "test" - } - ] - }, - "response": { - "headers": { - "Content-Type": "text/plain" - }, - "body": "[stream open]\n" - } -} + +[stream open] diff -Nru pushpin-0.0~20140620/tools/testgripwebsocket.php pushpin-1.0.0/tools/testgripwebsocket.php --- pushpin-0.0~20140620/tools/testgripwebsocket.php 1970-01-01 00:00:00.000000000 +0000 +++ pushpin-1.0.0/tools/testgripwebsocket.php 2014-09-15 17:04:42.000000000 +0000 @@ -0,0 +1,21 @@ +