diff -Nru opendht-1.5.0/CMakeLists.txt opendht-1.6.0/CMakeLists.txt --- opendht-1.5.0/CMakeLists.txt 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/CMakeLists.txt 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,7 @@ cmake_minimum_required (VERSION 3.1) project (opendht) set (opendht_VERSION_MAJOR 1) -set (opendht_VERSION_MINOR 5.0) +set (opendht_VERSION_MINOR 6.0) set (opendht_VERSION ${opendht_VERSION_MAJOR}.${opendht_VERSION_MINOR}) set (PACKAGE_VERSION ${opendht_VERSION}) set (VERSION "${opendht_VERSION}") @@ -17,6 +17,7 @@ option (OPENDHT_LTO "Build with LTO" OFF) option (OPENDHT_SANITIZE "Build with address sanitizer and stack protector" OFF) option (OPENDHT_PROXY_SERVER "Enable DHT proxy server, use Restbed and jsoncpp" OFF) +option (OPENDHT_PUSH_NOTIFICATIONS "Enable push notifications support" OFF) option (OPENDHT_PROXY_SERVER_IDENTITY "Allow clients to use the node identity" OFF) option (OPENDHT_PROXY_CLIENT "Enable DHT proxy client, use Restbed and jsoncpp" OFF) @@ -145,6 +146,7 @@ include/opendht/node.h include/opendht/value.h include/opendht/dht.h + include/opendht/dht_interface.h include/opendht/callbacks.h include/opendht/routing_table.h include/opendht/node_cache.h @@ -170,13 +172,36 @@ ) list (APPEND opendht_SOURCES src/dht_proxy_server.cpp - src/base64.h - src/base64.cpp ) + if (OPENDHT_PUSH_NOTIFICATIONS) + add_definitions(-DOPENDHT_PUSH_NOTIFICATIONS=true) + else () + add_definitions(-DOPENDHT_PUSH_NOTIFICATIONS=false) + endif () else () add_definitions(-DENABLE_PROXY_SERVER=false) endif () +if (OPENDHT_PROXY_CLIENT) + add_definitions(-DOPENDHT_PROXY_CLIENT=true) + list (APPEND opendht_HEADERS + include/opendht/dht_proxy_client.h + ) + list (APPEND opendht_SOURCES + src/dht_proxy_client.cpp + ) +else () + add_definitions(-DOPENDHT_PROXY_CLIENT=false) +endif () + +if (OPENDHT_PROXY_SERVER OR OPENDHT_PROXY_CLIENT) + list (APPEND opendht_SOURCES + src/base64.h + src/base64.cpp + ) +endif () + + if(OPENDHT_ARGON2) # make sure argon2 submodule is up to date and initialized message("Initializing Argon2 submodule") diff -Nru opendht-1.5.0/configure.ac opendht-1.6.0/configure.ac --- opendht-1.5.0/configure.ac 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/configure.ac 2018-02-26 22:19:32.000000000 +0000 @@ -1,6 +1,6 @@ dnl define macros m4_define([opendht_major_version], 1) -m4_define([opendht_minor_version], 5) +m4_define([opendht_minor_version], 6) m4_define([opendht_patch_version], 0) m4_define([opendht_version], [opendht_major_version.opendht_minor_version.opendht_patch_version]) @@ -130,13 +130,28 @@ AC_ARG_ENABLE([proxy_server], AS_HELP_STRING([--enable-proxy-server], [Enable proxy server ability]), proxy_server=yes, proxy_server=no) AM_CONDITIONAL(ENABLE_PROXY_SERVER, test x$proxy_server == xyes) +AC_ARG_ENABLE([push_notifications], AS_HELP_STRING([--enable-push-notifications], [Enable push notifications support]), push_notifications=yes, push_notifications=no) +AM_CONDITIONAL(ENABLE_PUSH_NOTIFICATIONS, test x$push_notifications == xyes) +AC_ARG_ENABLE([proxy_server_identity], AS_HELP_STRING([--enable-proxy-server-identity], + [Enable proxy server ability]), proxy_server_identity=yes, proxy_server_identity=no) +AM_CONDITIONAL(ENABLE_PROXY_SERVER_IDENTITY, test x$proxy_server_identity == xyes) +AC_ARG_ENABLE([proxy_client], AS_HELP_STRING([--enable-proxy-client], [Enable proxy client ability]), proxy_client=yes, proxy_client=no) +AM_CONDITIONAL(ENABLE_PROXY_CLIENT, test x$proxy_client == xyes) + AM_COND_IF([ENABLE_PROXY_SERVER], [ AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files])) - PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.4]) + PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.2]) CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=true -ljsoncpp -lrestbed" ], [ CPPFLAGS+=" -DOPENDHT_PROXY_SERVER=false" ]) + +AM_COND_IF([ENABLE_PUSH_NOTIFICATIONS], [ + CPPFLAGS+=" -DOPENDHT_PUSH_NOTIFICATIONS=true" +], [ + CPPFLAGS+=" -DOPENDHT_PUSH_NOTIFICATIONS=false" +]) + AC_ARG_ENABLE([proxy_server_identity], AS_HELP_STRING([--enable-proxy-server-identity], [Enable proxy server ability]), proxy_server_identity=yes, proxy_server_identity=no) AM_CONDITIONAL(ENABLE_PROXY_SERVER_IDENTITY, test x$proxy_server_identity == xyes -a x$proxy_server == xyes) @@ -146,6 +161,20 @@ CPPFLAGS+=" -DOPENDHT_PROXY_SERVER_IDENTITY=false" ]) +AM_COND_IF([ENABLE_PROXY_CLIENT], [ + CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT=true" +], [ + CPPFLAGS+=" -DOPENDHT_PROXY_CLIENT=false" +]) + +AM_CONDITIONAL(PROXY_CLIENT_OR_SERVER, test x$proxy_client == xyes | test x$proxy_server == xyes) +AM_COND_IF([PROXY_CLIENT_OR_SERVER], [ + AC_CHECK_LIB(restbed, exit,, AC_MSG_ERROR([Missing restbed files])) + PKG_CHECK_MODULES([Jsoncpp], [jsoncpp >= 1.7.4]) + CPPFLAGS="${CPPFLAGS} ${Jsoncpp_CFLAGS}" + LDFLAGS="${LDFLAGS} ${Jsoncpp_LIBS} -lrestbed" +], []) + AC_CONFIG_FILES([doc/Doxyfile doc/Makefile]) diff -Nru opendht-1.5.0/debian/changelog opendht-1.6.0/debian/changelog --- opendht-1.5.0/debian/changelog 2018-02-01 17:47:24.000000000 +0000 +++ opendht-1.6.0/debian/changelog 2018-02-27 06:11:11.000000000 +0000 @@ -1,3 +1,10 @@ +opendht (1.6.0-1) unstable; urgency=medium + + * d/watch: exclude release candidates. + * d/copyright: fix insecure-copyright-format-uri. + + -- Alexandre Viau Tue, 27 Feb 2018 06:11:11 +0000 + opendht (1.5.0-3) unstable; urgency=medium * Build with msgpack-c v2 API. diff -Nru opendht-1.5.0/debian/copyright opendht-1.6.0/debian/copyright --- opendht-1.5.0/debian/copyright 2018-02-01 17:47:24.000000000 +0000 +++ opendht-1.6.0/debian/copyright 2018-02-27 06:11:11.000000000 +0000 @@ -1,4 +1,4 @@ -Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ +Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ Upstream-Name: ring Upstream-Contact: Alexandre Viau Source: https://github.com/savoirfairelinux/opendht diff -Nru opendht-1.5.0/debian/patches/pkgconfig-static.patch opendht-1.6.0/debian/patches/pkgconfig-static.patch --- opendht-1.5.0/debian/patches/pkgconfig-static.patch 2018-02-01 17:47:24.000000000 +0000 +++ opendht-1.6.0/debian/patches/pkgconfig-static.patch 2018-02-27 06:11:11.000000000 +0000 @@ -1,6 +1,6 @@ Description: Add missing Libs flags Debian only ships libopendht.a. The pkg-config file is only - included in the dev pacakge. This modifies it so that the + included in the dev package. This modifies it so that the linking works with libopendht.a Author: Alexandre Viau diff -Nru opendht-1.5.0/debian/watch opendht-1.6.0/debian/watch --- opendht-1.5.0/debian/watch 2018-02-01 17:47:24.000000000 +0000 +++ opendht-1.6.0/debian/watch 2018-02-27 06:11:11.000000000 +0000 @@ -2,5 +2,6 @@ opts=\ filenamemangle=s/.+\/v?(\d\S*)\.tar\.gz/opendht-$1\.tar\.gz/,\ repacksuffix=~dfsg1,\ +uversionmangle=s/(\d)[_\.\-\+]?((RC|rc|pre|dev|beta|alpha)\d*)$/$1~$2/,\ dversionmangle=s/\~dfsg\d*$// \ https://github.com/savoirfairelinux/opendht/tags .*/v?(\d\S*)\.tar\.gz diff -Nru opendht-1.5.0/docker/DockerfileDepsProxy opendht-1.6.0/docker/DockerfileDepsProxy --- opendht-1.5.0/docker/DockerfileDepsProxy 1970-01-01 00:00:00.000000000 +0000 +++ opendht-1.6.0/docker/DockerfileDepsProxy 2018-02-26 22:19:32.000000000 +0000 @@ -0,0 +1,16 @@ +FROM aberaud/opendht-deps + +# install jsoncpp +RUN apt-get install libjsoncpp-dev -y + +# install restbed dependencies +RUN sed -i -e 's/archive.ubuntu.com\|security.ubuntu.com/old-releases.ubuntu.com/g' /etc/apt/sources.list # LTS... avoid 404 for libasio-dev +RUN apt-get update -y +RUN apt-get install libasio-dev -y + +# build restbed from sources +RUN git clone --recursive https://github.com/corvusoft/restbed.git +WORKDIR restbed/build +RUN cmake -DBUILD_TESTS=NO -DBUILD_EXAMPLES=NO -DBUILD_SSL=NO -DBUILD_SHARED=YES -DCMAKE_INSTALL_PREFIX=/usr .. +RUN make -j 8 install +RUN mv /usr/library/librestbed* /usr/lib/ diff -Nru opendht-1.5.0/docker/DockerfileLlvm opendht-1.6.0/docker/DockerfileLlvm --- opendht-1.5.0/docker/DockerfileLlvm 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/docker/DockerfileLlvm 1970-01-01 00:00:00.000000000 +0000 @@ -1,6 +0,0 @@ -FROM aberaud/opendht-deps-llvm -MAINTAINER Adrien Béraud -RUN git clone https://github.com/savoirfairelinux/opendht.git \ - && cd opendht && mkdir build && cd build \ - && cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On && make -j8 && make install \ - && cd ../.. && rm -rf opendht diff -Nru opendht-1.5.0/docker/DockerfileTravis opendht-1.6.0/docker/DockerfileTravis --- opendht-1.5.0/docker/DockerfileTravis 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/docker/DockerfileTravis 2018-02-26 22:19:32.000000000 +0000 @@ -2,5 +2,4 @@ MAINTAINER Adrien Béraud COPY . /root/opendht RUN cd /root/opendht && mkdir build && cd build \ - && cmake -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On -DOPENDHT_LTO=On .. && make -j8 && make install \ - && cd ../.. && rm -rf opendht + && cmake -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On -DOPENDHT_LTO=On .. && make -j8 && make install diff -Nru opendht-1.5.0/docker/DockerfileTravisLlvm opendht-1.6.0/docker/DockerfileTravisLlvm --- opendht-1.5.0/docker/DockerfileTravisLlvm 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/docker/DockerfileTravisLlvm 2018-02-26 22:19:32.000000000 +0000 @@ -2,5 +2,4 @@ MAINTAINER Adrien Béraud COPY . /root/opendht RUN cd /root/opendht && mkdir build && cd build \ - && cmake -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On .. && make -j8 && make install \ - && cd ../.. && rm -rf opendht + && cmake -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=On .. && make -j8 && make install diff -Nru opendht-1.5.0/docker/DockerfileTravisProxy opendht-1.6.0/docker/DockerfileTravisProxy --- opendht-1.5.0/docker/DockerfileTravisProxy 1970-01-01 00:00:00.000000000 +0000 +++ opendht-1.6.0/docker/DockerfileTravisProxy 2018-02-26 22:19:32.000000000 +0000 @@ -0,0 +1,3 @@ +FROM opendht-deps-proxy +MAINTAINER Adrien Béraud +COPY . /root/opendht diff -Nru opendht-1.5.0/include/opendht/callbacks.h opendht-1.6.0/include/opendht/callbacks.h --- opendht-1.5.0/include/opendht/callbacks.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/callbacks.h 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,8 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. - * Author : Adrien Béraud + * Authors: Adrien Béraud * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -44,18 +45,20 @@ }; struct OPENDHT_PUBLIC NodeStats { - unsigned good_nodes, - dubious_nodes, - cached_nodes, - incoming_nodes; - unsigned table_depth; + unsigned good_nodes {0}, + dubious_nodes {0}, + cached_nodes {0}, + incoming_nodes {0}; + unsigned table_depth {0}; unsigned getKnownNodes() const { return good_nodes + dubious_nodes; } std::string toString() const; -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT /** * Build a json object from a NodeStats */ Json::Value toJson() const; + NodeStats() {}; + explicit NodeStats(const Json::Value& v); #endif //OPENDHT_PROXY_SERVER }; diff -Nru opendht-1.5.0/include/opendht/crypto.h opendht-1.6.0/include/opendht/crypto.h --- opendht-1.5.0/include/opendht/crypto.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/crypto.h 2018-02-26 22:19:32.000000000 +0000 @@ -387,6 +387,7 @@ /** Read certificate alternative names */ std::vector> getAltNames() const; + std::chrono::system_clock::time_point getActivation() const; std::chrono::system_clock::time_point getExpiration() const; /** diff -Nru opendht-1.5.0/include/opendht/dht.h opendht-1.6.0/include/opendht/dht.h --- opendht-1.5.0/include/opendht/dht.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/dht.h 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,8 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. - * Author(s) : Adrien Béraud - * Simon Désaulniers + * Authors: Adrien Béraud + * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -26,7 +27,7 @@ #include "scheduler.h" #include "routing_table.h" #include "callbacks.h" -#include "log_enable.h" +#include "dht_interface.h" #include #include @@ -58,22 +59,16 @@ * Must be given open UDP sockets and ::periodic must be * called regularly. */ -class OPENDHT_PUBLIC Dht { +class OPENDHT_PUBLIC Dht final : public DhtInterface { public: - // [[deprecated]] - using NodeExport = dht::NodeExport; - - // [[deprecated]] - using Status = NodeStatus; - Dht(); /** * Initialise the Dht with two open sockets (for IPv4 and IP6) * and an ID for the node. */ - Dht(int s, int s6, Config config); + Dht(const int& s, const int& s6, Config config); virtual ~Dht(); /** @@ -103,18 +98,6 @@ */ bool isRunning(sa_family_t af = 0) const; - /** - * Enable or disable logging of DHT internal messages - */ - void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG); - - /** - * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). - */ - void setLogFilter(const InfoHash& f) { - DHT_LOG.setFilter(f); - } - virtual void registerType(const ValueType& type) { types[type.id] = type; } @@ -248,7 +231,7 @@ * * @return a token to cancel the listener later. */ - virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&& w = {}); + virtual size_t listen(const InfoHash&, GetCallback, Value::Filter={}, Where w = {}); virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { return listen(key, bindGetCb(cb), std::forward(f), std::forward(w)); } @@ -307,10 +290,8 @@ std::vector getPublicAddress(sa_family_t family = 0); -protected: - Logger DHT_LOG; - bool logFilerEnable_ {}; - InfoHash logFiler_ {}; + void pushNotificationReceived(const std::map&) {} + void resubscribe(unsigned) {} private: diff -Nru opendht-1.5.0/include/opendht/dht_interface.h opendht-1.6.0/include/opendht/dht_interface.h --- opendht-1.5.0/include/opendht/dht_interface.h 1970-01-01 00:00:00.000000000 +0000 +++ opendht-1.6.0/include/opendht/dht_interface.h 2018-02-26 22:19:32.000000000 +0000 @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2014-2017 Savoir-faire Linux Inc. + * Author: Sébastien Blin + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +#include "infohash.h" +#include "log_enable.h" + +namespace dht { + +class OPENDHT_PUBLIC DhtInterface { +public: + DhtInterface() = default; + virtual ~DhtInterface() = default; + + // [[deprecated]] + using Status = NodeStatus; + // [[deprecated]] + using NodeExport = dht::NodeExport; + + /** + * Get the current status of the node for the given family. + */ + virtual NodeStatus getStatus(sa_family_t af) const = 0; + virtual NodeStatus getStatus() const = 0; + + /** + * Get the ID of the node. + */ + virtual const InfoHash& getNodeId() const = 0; + + /** + * Performs final operations before quitting. + */ + virtual void shutdown(ShutdownCallback cb) = 0; + + /** + * Returns true if the node is running (have access to an open socket). + * + * af: address family. If non-zero, will return true if the node + * is running for the provided family. + */ + virtual bool isRunning(sa_family_t af = 0) const = 0; + + virtual void registerType(const ValueType& type) = 0; + + virtual const ValueType& getType(ValueType::Id type_id) const = 0; + + /** + * Insert a node in the main routing table. + * The node is not pinged, so this should be + * used to bootstrap efficiently from previously known nodes. + */ + virtual void insertNode(const InfoHash& id, const SockAddr&) = 0; + virtual void insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen) = 0; + virtual void insertNode(const NodeExport& n) = 0; + + virtual void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& cb={}) = 0; + + virtual time_point periodic(const uint8_t *buf, size_t buflen, const SockAddr&) = 0; + virtual time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) = 0; + + /** + * Get a value by searching on all available protocols (IPv4, IPv6), + * and call the provided get callback when values are found at key. + * The operation will start as soon as the node is connected to the network. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param donecb a function called when the operation is complete. + cb and donecb won't be called again afterward. + * @param f a filter function used to prefilter values. + */ + virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) = 0; + virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) = 0; + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) = 0; + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) = 0; + + /** + * Similar to Dht::get, but sends a Query to filter data remotely. + * @param key the key for which to query data for. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param done_cb a function called when the operation is complete. + cb and done_cb won't be called again afterward. + * @param q a query used to filter values on the remotes before they send a + * response. + */ + virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) = 0; + virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) = 0; + + /** + * Get locally stored data for the given hash. + */ + virtual std::vector> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const = 0; + + /** + * Get locally stored data for the given key and value id. + */ + virtual Sp getLocalById(const InfoHash& key, Value::Id vid) const = 0; + + /** + * Announce a value on all available protocols (IPv4, IPv6). + * + * The operation will start as soon as the node is connected to the network. + * The done callback will be called once, when the first announce succeeds, or fails. + */ + virtual void put(const InfoHash& key, + Sp, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) = 0; + virtual void put(const InfoHash& key, + const Sp& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) = 0; + virtual void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) = 0; + virtual void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) = 0; + + /** + * Get data currently being put at the given hash. + */ + virtual std::vector> getPut(const InfoHash&) = 0; + + /** + * Get data currently being put at the given hash with the given id. + */ + virtual Sp getPut(const InfoHash&, const Value::Id&) = 0; + + /** + * Stop any put/announce operation at the given location, + * for the value with the given id. + */ + virtual bool cancelPut(const InfoHash&, const Value::Id&) = 0; + + /** + * Listen on the network for any changes involving a specified hash. + * The node will register to receive updates from relevent nodes when + * new values are added or removed. + * + * @return a token to cancel the listener later. + */ + virtual size_t listen(const InfoHash&, GetCallback, Value::Filter={}, Where w = {}) = 0; + virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) = 0; + + virtual bool cancelListen(const InfoHash&, size_t token) = 0; + + /** + * Inform the DHT of lower-layer connectivity changes. + * This will cause the DHT to assume a public IP address change. + * The DHT will recontact neighbor nodes, re-register for listen ops etc. + */ + virtual void connectivityChanged(sa_family_t) = 0; + virtual void connectivityChanged() = 0; + + /** + * Get the list of good nodes for local storage saving purposes + * The list is ordered to minimize the back-to-work delay. + */ + virtual std::vector exportNodes() = 0; + + virtual std::vector exportValues() const = 0; + virtual void importValues(const std::vector&) = 0; + + virtual NodeStats getNodesStats(sa_family_t af) const = 0; + + virtual std::string getStorageLog() const = 0; + virtual std::string getStorageLog(const InfoHash&) const = 0; + + virtual std::string getRoutingTablesLog(sa_family_t) const = 0; + virtual std::string getSearchesLog(sa_family_t) const = 0; + virtual std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const = 0; + + virtual void dumpTables() const = 0; + virtual std::vector getNodeMessageStats(bool in = false) = 0; + + /** + * Set the in-memory storage limit in bytes + */ + virtual void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) = 0; + + /** + * Returns the total memory usage of stored values and the number + * of stored values. + */ + virtual std::pair getStoreSize() const = 0; + + virtual std::vector getPublicAddress(sa_family_t family = 0) = 0; + + /** + * Enable or disable logging of DHT internal messages + */ + virtual void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG) + { + DHT_LOG.DEBUG = debug; + DHT_LOG.WARN = warn; + DHT_LOG.ERR = error; + } + + /** + * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). + */ + virtual void setLogFilter(const InfoHash& f) + { + DHT_LOG.setFilter(f); + } + + virtual void setPushNotificationToken(const std::string&) {}; + + /** + * Call linked callback with a push notification + * @param notification to process + */ + virtual void pushNotificationReceived(const std::map& data) = 0; + /** + * Refresh a listen via a token + * @param token + */ + virtual void resubscribe(unsigned token) = 0; + +protected: + bool logFilerEnable_ {}; + InfoHash logFiler_ {}; + Logger DHT_LOG; +}; + +} // namespace dht diff -Nru opendht-1.5.0/include/opendht/dht_proxy_client.h opendht-1.6.0/include/opendht/dht_proxy_client.h --- opendht-1.5.0/include/opendht/dht_proxy_client.h 1970-01-01 00:00:00.000000000 +0000 +++ opendht-1.6.0/include/opendht/dht_proxy_client.h 2018-02-26 22:19:32.000000000 +0000 @@ -0,0 +1,355 @@ +/* + * Copyright (C) 2016-2018 Savoir-faire Linux Inc. + * Author: Sébastien Blin + * Adrien Béraud + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#if OPENDHT_PROXY_CLIENT + +#pragma once + +#include +#include +#include + +#include "callbacks.h" +#include "def.h" +#include "dht_interface.h" +#include "scheduler.h" + +namespace restbed { + class Request; +} + +namespace Json { + class Value; +} + +namespace dht { + +class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface { +public: + + DhtProxyClient() : scheduler(DHT_LOG) {} + + explicit DhtProxyClient(std::function loopSignal, const std::string& serverHost, const std::string& pushClientId = ""); + + virtual void setPushNotificationToken(const std::string& token) { +#if OPENDHT_PUSH_NOTIFICATIONS + deviceKey_ = token; +#endif + } + + virtual ~DhtProxyClient(); + + /** + * Get the ID of the node. + */ + inline const InfoHash& getNodeId() const { return myid; } + + /** + * Get the current status of the node for the given family. + */ + NodeStatus getStatus(sa_family_t af) const; + NodeStatus getStatus() const { + return std::max(getStatus(AF_INET), getStatus(AF_INET6)); + } + + /** + * Performs final operations before quitting. + */ + void shutdown(ShutdownCallback cb); + + /** + * Returns true if the node is running (have access to an open socket). + * + * af: address family. If non-zero, will return true if the node + * is running for the provided family. + */ + bool isRunning(sa_family_t af = 0) const; + + /** + * Get a value by asking the proxy and call the provided get callback when + * values are found at key. + * The operation will start as soon as the node is connected to the network. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param donecb a function called when the operation is complete. + cb and donecb won't be called again afterward. + * @param f a filter function used to prefilter values. + */ + virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}); + virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) { + get(key, cb, bindDoneCb(donecb), std::forward(f), std::forward(w)); + } + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) { + get(key, bindGetCb(cb), donecb, std::forward(f), std::forward(w)); + } + virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) { + get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward(f), std::forward(w)); + } + + void get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain); + + /** + * Announce a value on all available protocols (IPv4, IPv6). + * + * The operation will start as soon as the node is connected to the network. + * The done callback will be called once, when the first announce succeeds, or fails. + * NOTE: For now, created parameter is ignored. + */ + void put(const InfoHash& key, + Sp, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false); + void put(const InfoHash& key, + const Sp& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, v, bindDoneCb(cb), created, permanent); + } + + void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, std::make_shared(std::move(v)), cb, created, permanent); + } + void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, std::forward(v), bindDoneCb(cb), created, permanent); + } + + /** + * @param af the socket family + * @return node stats from the proxy + */ + NodeStats getNodesStats(sa_family_t af) const; + + /** + * @param family the socket family + * @return public address + */ + std::vector getPublicAddress(sa_family_t family = 0); + + /** + * Listen on the network for any changes involving a specified hash. + * The node will register to receive updates from relevent nodes when + * new values are added or removed. + * + * @return a token to cancel the listener later. + */ + virtual size_t listen(const InfoHash&, GetCallback, Value::Filter={}, Where={}); + virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { + return listen(key, bindGetCb(cb), std::forward(f), std::forward(w)); + } + virtual bool cancelListen(const InfoHash&, size_t token); + + /** + * Call linked callback with a push notification + * @param notification to process + */ + void pushNotificationReceived(const std::map& notification); + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(const unsigned token); + + time_point periodic(const uint8_t*, size_t, const SockAddr&); + time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { + return periodic(buf, buflen, SockAddr(from, fromlen)); + } + + + /** + * Similar to Dht::get, but sends a Query to filter data remotely. + * @param key the key for which to query data for. + * @param cb a function called when new values are found on the network. + * It should return false to stop the operation. + * @param done_cb a function called when the operation is complete. + cb and done_cb won't be called again afterward. + * @param q a query used to filter values on the remotes before they send a + * response. + */ + virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) { } + virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) { + query(key, cb, bindDoneCb(done_cb), std::forward(q)); + } + + /** + * Get data currently being put at the given hash. + */ + std::vector> getPut(const InfoHash&) { return {}; } + + /** + * Get data currently being put at the given hash with the given id. + */ + Sp getPut(const InfoHash&, const Value::Id&) { return {}; } + + /** + * Stop any put/announce operation at the given location, + * for the value with the given id. + */ + bool cancelPut(const InfoHash&, const Value::Id&) { return false; } + + void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { } + + /** + * NOTE: The following methods will not be implemented because the + * DhtProxyClient doesn't have any storage nor synchronization process + */ + void insertNode(const InfoHash&, const SockAddr&) { } + void insertNode(const InfoHash&, const sockaddr*, socklen_t) { } + void insertNode(const NodeExport&) { } + std::pair getStoreSize() const { return {}; } + virtual void registerType(const ValueType&) { } + const ValueType& getType(ValueType::Id) const { return NO_VALUE; } + std::vector> getLocal(const InfoHash&, Value::Filter) const { return {}; } + Sp getLocalById(const InfoHash&, Value::Id) const { return {}; } + std::vector exportNodes() { return {}; } + std::vector exportValues() const { return {}; } + void importValues(const std::vector&) {} + std::string getStorageLog() const { return {}; } + std::string getStorageLog(const InfoHash&) const { return {}; } + std::string getRoutingTablesLog(sa_family_t) const { return {}; } + std::string getSearchesLog(sa_family_t) const { return {}; } + std::string getSearchLog(const InfoHash&, sa_family_t) const { return {}; } + void dumpTables() const {} + std::vector getNodeMessageStats(bool) { return {}; } + void setStorageLimit(size_t) {} + void connectivityChanged(sa_family_t) {} + void connectivityChanged() { } + +private: + const ValueType NO_VALUE; + + /** + * Start the connection with a server. + */ + void startProxy(); + + /** + * Get informations from the proxy node + * @return the JSON returned by the proxy + */ + void getProxyInfos(); + void onProxyInfos(const Json::Value& val); + SockAddr parsePublicAddress(const Json::Value& val); + + void opFailed(); + + /** + * Initialize statusIpvX_ + */ + void getConnectivityStatus(); + /** + * cancel all Listeners + */ + void cancelAllListeners(); + /** + * cancel all Operations + */ + void cancelAllOperations(); + std::string serverHost_; + std::string pushClientId_; + + std::atomic_flag ongoingStatusUpdate_ = ATOMIC_FLAG_INIT; + NodeStatus statusIpv4_ {NodeStatus::Disconnected}; + NodeStatus statusIpv6_ {NodeStatus::Disconnected}; + NodeStats stats4_ {}; + NodeStats stats6_ {}; + SockAddr publicAddress_; + + InfoHash myid {}; + + /** + * Store listen requests. + */ + struct Listener + { + size_t token; + std::shared_ptr req; + std::string key; + GetCallback cb; + Value::Filter filterChain; + std::thread thread; + unsigned callbackId; + std::shared_ptr pushNotifToken; // NOTE: unused if not using push notifications + }; + std::vector listeners_; + size_t listener_token_ {0}; + std::mutex lockListener_; + + /** + * Store current put and get requests. + */ + struct Operation + { + std::shared_ptr req; + std::thread thread; + std::shared_ptr finished; + }; + std::vector operations_; + std::mutex lockOperations_; + /** + * Callbacks should be executed in the main thread. + */ + std::vector> callbacks_; + std::mutex lockCallbacks; + + std::thread statusThread_; + mutable std::mutex lockCurrentProxyInfos_; + + Scheduler scheduler; + /** + * Retrieve if we can connect to the proxy (update statusIpvX_) + */ + void confirmProxy(); + Sp nextProxyConfirmation {}; + /** + * Relaunch LISTEN requests if the client disconnect/reconnect. + */ + void restartListeners(); + + /** + * If we want to use push notifications by default. + * NOTE: empty by default to avoid to use services like FCM or APN. + */ + std::string deviceKey_ {}; + unsigned callbackId_ {0}; + std::mutex lockCallback_; + + const std::function loopSignal_; + +#if OPENDHT_PUSH_NOTIFICATIONS + void fillBodyToGetToken(std::shared_ptr request, unsigned callbackId); +#endif // OPENDHT_PUSH_NOTIFICATIONS + +}; + +} + +#endif // OPENDHT_PROXY_CLIENT diff -Nru opendht-1.5.0/include/opendht/dht_proxy_server.h opendht-1.6.0/include/opendht/dht_proxy_server.h --- opendht-1.5.0/include/opendht/dht_proxy_server.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/dht_proxy_server.h 2018-02-26 22:19:32.000000000 +0000 @@ -1,6 +1,6 @@ /* - * Copyright (C) 2017 Savoir-faire Linux Inc. - * Author : Sébastien Blin + * Copyright (C) 2017-2018 Savoir-faire Linux Inc. + * Author: Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -26,8 +26,13 @@ #include #include +#include #include +namespace Json { + class Value; +} + namespace dht { class DhtRunner; @@ -42,10 +47,11 @@ * Start the Http server for OpenDHT * @param dht the DhtRunner linked to this proxy server * @param port to listen + * @param pushServer where to push notifications * @note if the server fails to start (if port is already used or reserved), * it will fails silently */ - DhtProxyServer(std::shared_ptr dht, in_port_t port = 8000); + DhtProxyServer(std::shared_ptr dht, in_port_t port = 8000, const std::string& pushServer = ""); virtual ~DhtProxyServer(); DhtProxyServer(const DhtProxyServer& other) = delete; @@ -69,7 +75,7 @@ void getNodeInfo(const std::shared_ptr& session) const; /** - * Return Values of a InfoHash + * Return Values of an infoHash * Method: GET "/{InfoHash: .*}" * Return: Multiple JSON object in parts. Example: * Value in JSON format\n @@ -81,7 +87,7 @@ void get(const std::shared_ptr& session) const; /** - * Listen incoming Values of a InfoHash. + * Listen incoming Values of an infoHash. * Method: LISTEN "/{InfoHash: .*}" * Return: Multiple JSON object in parts. Example: * Value in JSON format\n @@ -128,7 +134,7 @@ #endif // OPENDHT_PROXY_SERVER_IDENTITY /** - * Return Values of a InfoHash filtered by a value id + * Return Values of an infoHash filtered by a value id * Method: GET "/{InfoHash: .*}/{ValueId: .*}" * Return: Multiple JSON object in parts. Example: * Value in JSON format\n @@ -144,9 +150,43 @@ * Method: OPTIONS "/{hash: .*}" * Return: HTTP 200 + Allow: allowed methods * See https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS + * @param session */ void handleOptionsMethod(const std::shared_ptr& session) const; +#if OPENDHT_PUSH_NOTIFICATIONS + /** + * Subscribe to push notifications for an iOS or Android device. + * Method: SUBSCRIBE "/{InfoHash: .*}" + * Body: {"key": "device_key", (optional)"callback_id":y, + * (optional)"isAndroid":false (default true)}" + * Return: {"token": x}" where x if a token to save + * @note: the listen will timeout after six hours (and send a push notification). + * so you need to refresh the operation each six hours. + * @note: callback_id is used to add the possibility to have multiple listen + * on same hash for same device and must be > 0 + * @param session + */ + void subscribe(const std::shared_ptr& session) const; + /** + * Unsubscribe to push notifications for an iOS or Android device. + * Method: UNSUBSCRIBE "/{InfoHash: .*}" + * Body: {"key": "device_key", "token": x, (optional)"callback_id":y" + * where x if the token to cancel + * Return: nothing + * @note: callback id is used to add the possibility to have multiple listen + * on same hash for same device + * @param session + */ + void unsubscribe(const std::shared_ptr& session) const; + /** + * Send a push notification via a gorush push gateway + * @param key of the device + * @param json, the content to send + */ + void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const; +#endif //OPENDHT_PUSH_NOTIFICATIONS + std::thread server_thread {}; std::unique_ptr service_; std::shared_ptr dht_; @@ -160,7 +200,36 @@ std::future token; }; mutable std::vector currentListeners_; + mutable std::mutex lockListener_; std::atomic_bool stopListeners {false}; + +#if OPENDHT_PUSH_NOTIFICATIONS + struct PushListener { + std::string pushToken; + InfoHash hash; + unsigned token; + std::future internalToken; + std::chrono::steady_clock::time_point deadline; + bool started {false}; + unsigned callbackId {0}; + std::string clientId {}; + bool isAndroid {true}; + }; + mutable std::mutex lockPushListeners_; + mutable std::vector pushListeners_; + mutable unsigned tokenPushNotif_ {0}; +#endif //OPENDHT_PUSH_NOTIFICATIONS + const std::string pushServer_; + + /** + * Remove finished listeners + * @param testSession if we remove the listener only if the session is closed + */ + void removeClosedListeners(bool testSession = true); + /** + * Launch or remove push listeners if needed + */ + void handlePushListeners(); }; } diff -Nru opendht-1.5.0/include/opendht/dhtrunner.h opendht-1.6.0/include/opendht/dhtrunner.h --- opendht-1.5.0/include/opendht/dhtrunner.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/dhtrunner.h 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,8 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. - * Author(s) : Adrien Béraud - * Simon Désaulniers + * Authors: Adrien Béraud + * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -19,7 +20,6 @@ #pragma once -//#include "securedht.h" #include "infohash.h" #include "value.h" #include "callbacks.h" @@ -53,6 +53,13 @@ public: typedef std::function StatusCallback; + struct Config { + SecureDhtConfig dht_config; + bool threaded; + std::string proxy_server; + std::string push_node_id; + }; + DhtRunner(); virtual ~DhtRunner(); @@ -294,11 +301,6 @@ void registerCertificate(std::shared_ptr cert); void setLocalCertificateStore(CertificateStoreQuery&& query_method); - struct Config { - SecureDhtConfig dht_config; - bool threaded; - }; - /** * @param port: Local port to bind. Both IPv4 and IPv6 will be tried (ANY). * @param identity: RSA key pair to use for cryptographic operations. @@ -316,7 +318,9 @@ }, /*.id = */identity }, - /*.threaded = */threaded + /*.threaded = */threaded, + /*.proxy_server = */"", + /*.push_node_id = */"" }); } void run(in_port_t port, Config config); @@ -347,7 +351,13 @@ */ time_point loop() { std::lock_guard lck(dht_mtx); - return loop_(); + time_point wakeup = time_point::min(); + try { + wakeup = loop_(); + } catch (const dht::SocketException& e) { + startNetwork(bound4, bound6); + } + return wakeup; } /** @@ -362,6 +372,43 @@ */ void join(); + void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "") { +#if OPENDHT_PROXY_CLIENT + if (config_.proxy_server == proxy and config_.push_node_id == pushNodeId) + return; + config_.proxy_server = proxy; + config_.push_node_id = pushNodeId; + enableProxy(use_proxy and not config_.proxy_server.empty()); +#endif + } + + /** + * Start or stop the proxy + * @param proxify if we want to use the proxy + * @param deviceKey non empty to enable push notifications + */ + void enableProxy(bool proxify); + + /* Push notification methods */ + + /** + * Updates the push notification device token + */ + void setPushNotificationToken(const std::string& token); + + /** + * Insert a push notification to process for OpenDHT + */ + void pushNotificationReceived(const std::map& data) const; + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(unsigned token); + + /* Proxy server mothods */ + void forwardAllMessages(bool forward); + private: static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10}; @@ -373,14 +420,48 @@ */ void tryBootstrapContinuously(); - void doRun(const SockAddr& sin4, const SockAddr& sin6, SecureDhtConfig config); + void startNetwork(const SockAddr sin4, const SockAddr sin6); time_point loop_(); NodeStatus getStatus() const { return std::max(status4, status6); } + /** Local DHT instance */ std::unique_ptr dht_; + + /** Proxy client instance */ + std::unique_ptr dht_via_proxy_; + + /** true if we are currently using a proxy */ + std::atomic_bool use_proxy {false}; + + /** Current configuration */ + Config config_; + + /** + * reset dht clients + */ + void resetDht(); + /** + * @return the current active DHT + */ + SecureDht* activeDht() const; + + /** + * Store current listeners and translates global tokens for each client. + */ + struct Listener { + size_t tokenClassicDht; + size_t tokenProxyDht; + GetCallback gcb; + InfoHash hash; + Value::Filter f; + Where w; + }; + std::map listeners_ {}; + size_t listener_token_ {1}; + mutable std::mutex dht_mtx {}; std::thread dht_thread {}; std::condition_variable cv {}; @@ -403,14 +484,19 @@ std::queue> pending_ops {}; std::mutex storage_mtx {}; - std::atomic running {false}; + std::atomic_bool running {false}; + std::atomic_bool running_network {false}; NodeStatus status4 {NodeStatus::Disconnected}, status6 {NodeStatus::Disconnected}; StatusCallback statusCb {nullptr}; + int s4 {-1}, s6 {-1}; SockAddr bound4 {}; SockAddr bound6 {}; + + /** Push notification token */ + std::string pushToken_; }; } diff -Nru opendht-1.5.0/include/opendht/infohash.h opendht-1.6.0/include/opendht/infohash.h --- opendht-1.5.0/include/opendht/infohash.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/infohash.h 2018-02-26 22:19:32.000000000 +0000 @@ -378,7 +378,7 @@ void msgpack_unpack(msgpack::object o); - OPENDHT_PUBLIC friend std::ostream& operator<< (std::ostream& s, const InfoHash& h); + OPENDHT_PUBLIC friend std::ostream& operator<< (std::ostream& s, const NodeExport& h); }; } diff -Nru opendht-1.5.0/include/opendht/network_engine.h opendht-1.6.0/include/opendht/network_engine.h --- opendht-1.5.0/include/opendht/network_engine.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/network_engine.h 2018-02-26 22:19:32.000000000 +0000 @@ -202,8 +202,8 @@ using RequestCb = std::function; using RequestExpiredCb = std::function; - NetworkEngine(Logger& log, Scheduler& scheduler); - NetworkEngine(InfoHash& myid, NetId net, int s, int s6, Logger& log, Scheduler& scheduler, + NetworkEngine(Logger& log, Scheduler& scheduler, const int& s = -1, const int& s6 = -1); + NetworkEngine(InfoHash& myid, NetId net, const int& s, const int& s6, Logger& log, Scheduler& scheduler, decltype(NetworkEngine::onError) onError, decltype(NetworkEngine::onNewNode) onNewNode, decltype(NetworkEngine::onReportedAddr) onReportedAddr, @@ -505,8 +505,8 @@ /* DHT info */ const InfoHash& myid; const NetId network {0}; - const int dht_socket {-1}; - const int dht_socket6 {-1}; + const int& dht_socket; + const int& dht_socket6; const Logger& DHT_LOG; NodeCache cache {}; diff -Nru opendht-1.5.0/include/opendht/node.h opendht-1.6.0/include/opendht/node.h --- opendht-1.5.0/include/opendht/node.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/node.h 2018-02-26 22:19:32.000000000 +0000 @@ -117,7 +117,7 @@ */ Tid openSocket(SocketCb&& cb); - Socket* getSocket(Tid id); + Sp getSocket(Tid id); /** * Closes a socket so that no further data will be red on that socket. @@ -167,7 +167,7 @@ using TransactionDist = std::uniform_int_distribution; std::map> requests_ {}; - std::map sockets_; + std::map> sockets_; }; } diff -Nru opendht-1.5.0/include/opendht/securedht.h opendht-1.6.0/include/opendht/securedht.h --- opendht-1.5.0/include/opendht/securedht.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/securedht.h 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,8 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. - * Author : Adrien Béraud + * Authors: Adrien Béraud * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -29,13 +30,21 @@ namespace dht { -class OPENDHT_PUBLIC SecureDht : public Dht { +class OPENDHT_PUBLIC SecureDht final : public DhtInterface { public: typedef std::function SignatureCheckCallback; using Config = SecureDhtConfig; + static dht::Config& getConfig(SecureDht::Config& conf) + { + auto& c = conf.node_config; + if (not c.node_id and conf.id.second) + c.node_id = InfoHash::get("node:"+conf.id.second->getId().toString()); + return c; + } + SecureDht() {} /** @@ -44,7 +53,7 @@ * id: the identity to use for the crypto layer and to compute * our own hash on the Dht. */ - SecureDht(int s, int s6, Config config); + SecureDht(std::unique_ptr dht, Config config); virtual ~SecureDht(); @@ -62,14 +71,17 @@ return secureType(std::move(tmp_type)); } - virtual void registerType(const ValueType& type) override { - Dht::registerType(secureType(type)); - } - virtual void registerType(ValueType&& type) { - Dht::registerType(secureType(std::forward(type))); - } - virtual void registerInsecureType(const ValueType& type) { - Dht::registerType(type); + void registerType(const ValueType& type) { + if (dht_) + dht_->registerType(secureType(type)); + } + void registerType(ValueType&& type) { + if (dht_) + dht_->registerType(secureType(std::forward(type))); + } + void registerInsecureType(const ValueType& type) { + if (dht_) + dht_->registerType(type); } /** @@ -77,18 +89,18 @@ * If the signature can't be checked, or if the data can't be decrypted, it is not returned. * Public, non-signed & non-encrypted data is retransmitted as-is. */ - virtual void get(const InfoHash& id, GetCallback cb, DoneCallback donecb={}, Value::Filter&& = {}, Where&& w = {}) override; - virtual void get(const InfoHash& id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f = {}, Where&& w = {}) override { + void get(const InfoHash& id, GetCallback cb, DoneCallback donecb={}, Value::Filter&& = {}, Where&& w = {}); + void get(const InfoHash& id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f = {}, Where&& w = {}) { get(id, cb, bindDoneCb(donecb), std::forward(f), std::forward(w)); } - virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override { + void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) { get(key, bindGetCb(cb), donecb, std::forward(f), std::forward(w)); } - virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override { + void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) { get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward(f), std::forward(w)); } - virtual size_t listen(const InfoHash& id, GetCallback cb, Value::Filter&& = {}, Where&& w = {}) override; + size_t listen(const InfoHash& id, GetCallback cb, Value::Filter = {}, Where w = {}); /** * Will take ownership of the value, sign it using our private key and put it in the DHT. @@ -135,7 +147,193 @@ localQueryMethod_ = std::move(query_method); } + /** + * SecureDht to Dht proxy + */ + void shutdown(ShutdownCallback cb) { + dht_->shutdown(cb); + } + void dumpTables() const { + dht_->dumpTables(); + } + inline const InfoHash& getNodeId() const { return dht_->getNodeId(); } + std::pair getStoreSize() const { + return dht_->getStoreSize(); + } + std::string getStorageLog() const { + return dht_->getStorageLog(); + } + std::string getStorageLog(const InfoHash& h) const { + return dht_->getStorageLog(h); + } + void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) { + dht_->setStorageLimit(limit); + } + std::vector exportNodes() { + return dht_->exportNodes(); + } + std::vector exportValues() const { + return dht_->exportValues(); + } + void importValues(const std::vector& v) { + dht_->importValues(v); + } + NodeStats getNodesStats(sa_family_t af) const { + return dht_->getNodesStats(af); + } + std::vector getNodeMessageStats(bool in = false) { + return dht_->getNodeMessageStats(in); + } + std::string getRoutingTablesLog(sa_family_t af) const { + return dht_->getRoutingTablesLog(af); + } + std::string getSearchesLog(sa_family_t af) const { + return dht_->getSearchesLog(af); + } + std::string getSearchLog(const InfoHash& h, sa_family_t af = AF_UNSPEC) const { + return dht_->getSearchLog(h, af); + } + std::vector getPublicAddress(sa_family_t family = 0) { + return dht_->getPublicAddress(family); + } + time_point periodic(const uint8_t *buf, size_t buflen, const SockAddr& sa) { + return dht_->periodic(buf, buflen, sa); + } + time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { + return dht_->periodic(buf, buflen, from, fromlen); + } + NodeStatus getStatus(sa_family_t af) const { + return dht_->getStatus(af); + } + NodeStatus getStatus() const { + return dht_->getStatus(); + } + bool isRunning(sa_family_t af = 0) const { + return dht_->isRunning(af); + } + const ValueType& getType(ValueType::Id type_id) const { + return dht_->getType(type_id); + } + void insertNode(const InfoHash& id, const SockAddr& sa) { + dht_->insertNode(id, sa); + } + void insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen) { + dht_->insertNode(id, sa, salen); + } + void insertNode(const NodeExport& n) { + dht_->insertNode(n); + } + void pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb={}) { + dht_->pingNode(sa, salen, std::move(cb)); + } + void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {}) { + dht_->query(key, cb, done_cb, std::move(q)); + } + void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) { + dht_->query(key, cb, done_cb, std::move(q)); + } + std::vector> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const { + return dht_->getLocal(key, f); + } + Sp getLocalById(const InfoHash& key, Value::Id vid) const { + return dht_->getLocalById(key, vid); + } + void put(const InfoHash& key, + Sp v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, v, cb, created, permanent); + } + void put(const InfoHash& key, + const Sp& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, v, cb, created, permanent); + } + + void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, std::move(v), cb, created, permanent); + } + void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + dht_->put(key, std::move(v), cb, created, permanent); + } + std::vector> getPut(const InfoHash& h) { + return dht_->getPut(h); + } + Sp getPut(const InfoHash& h, const Value::Id& vid) { + return dht_->getPut(h, vid); + } + bool cancelPut(const InfoHash& h, const Value::Id& vid) { + return dht_->cancelPut(h, vid); + } + size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { + return dht_->listen(key, cb, f, w); + } + bool cancelListen(const InfoHash& h, size_t token) { + return dht_->cancelListen(h, token); + } + void connectivityChanged(sa_family_t af) { + dht_->connectivityChanged(af); + } + void connectivityChanged() { + dht_->connectivityChanged(); + } + + void forwardAllMessages(bool forward) { + forward_all_ = forward; + } + + void setPushNotificationToken(const std::string& token = "") { + dht_->setPushNotificationToken(token); + } + + /** + * Call linked callback with push_notification + * @param notification to process + */ + void pushNotificationReceived(const std::map& notification) { + dht_->pushNotificationReceived(notification); + } + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(const unsigned token) { + dht_->resubscribe(token); + } + + void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG) + { + DHT_LOG.DEBUG = debug; + DHT_LOG.WARN = warn; + DHT_LOG.ERR = error; + dht_->setLoggers(std::forward(error), std::forward(warn), std::forward(debug)); + } + + /** + * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). + */ + void setLogFilter(const InfoHash& f) { + DHT_LOG.setFilter(f); + dht_->setLogFilter(f); + } + private: + std::unique_ptr dht_; // prevent copy SecureDht(const SecureDht&) = delete; SecureDht& operator=(const SecureDht&) = delete; @@ -153,6 +351,8 @@ std::map> nodesPubKeys_ {}; std::uniform_int_distribution rand_id {}; + + std::atomic_bool forward_all_ {false}; }; const ValueType CERTIFICATE_TYPE = { diff -Nru opendht-1.5.0/include/opendht/sockaddr.h opendht-1.6.0/include/opendht/sockaddr.h --- opendht-1.5.0/include/opendht/sockaddr.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/sockaddr.h 2018-02-26 22:19:32.000000000 +0000 @@ -28,6 +28,8 @@ #endif #else #include +#include +#include #include #include typedef uint16_t sa_family_t; @@ -37,6 +39,7 @@ #include #include #include +#include #include #include @@ -67,6 +70,19 @@ throw std::runtime_error("Socket address length is too large"); set(sa, length); } + SockAddr(const sockaddr* sa) { + socklen_t len = 0; + if (sa) { + if (sa->sa_family == AF_INET) + len = sizeof(sockaddr_in); + else if(sa->sa_family == AF_INET6) + len = sizeof(sockaddr_in6); + else + throw std::runtime_error("Unknown address family"); + } + set(sa, len); + } + /** * Build from an existing sockaddr_storage structure. */ @@ -123,7 +139,7 @@ } if (new_length != len) { len = new_length; - if (len) addr.reset((sockaddr*)std::calloc(len, 1)); + if (len) addr.reset((sockaddr*)::calloc(len, 1)); else addr.reset(); } if (len > sizeof(sa_family_t)) @@ -209,6 +225,9 @@ bool isUnspecified() const; + bool isMappedIPv4() const; + SockAddr getMappedIPv4() const; + /** * A comparator to classify IP addresses, only considering the * first 64 bits in IPv6. @@ -239,13 +258,13 @@ }; private: socklen_t len {0}; - struct free_delete { void operator()(void* p) { std::free(p); } }; + struct free_delete { void operator()(void* p) { ::free(p); } }; std::unique_ptr addr {}; void set(const sockaddr* sa, socklen_t length) { if (len != length) { len = length; - if (len) addr.reset((sockaddr*)std::malloc(len)); + if (len) addr.reset((sockaddr*)::malloc(len)); else addr.reset(); } if (len) diff -Nru opendht-1.5.0/include/opendht/utils.h opendht-1.6.0/include/opendht/utils.h --- opendht-1.5.0/include/opendht/utils.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/utils.h 2018-02-26 22:19:32.000000000 +0000 @@ -55,9 +55,15 @@ } class OPENDHT_PUBLIC DhtException : public std::runtime_error { - public: - DhtException(const std::string &str = "") : - std::runtime_error("DhtException occurred: " + str) {} +public: + DhtException(const std::string &str = "") : + std::runtime_error("DhtException occurred: " + str) {} +}; + +class OPENDHT_PUBLIC SocketException : public DhtException { +public: + SocketException(int err) : + DhtException(strerror(err)) {} }; // Time related definitions and utility functions diff -Nru opendht-1.5.0/include/opendht/value.h opendht-1.6.0/include/opendht/value.h --- opendht-1.5.0/include/opendht/value.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/include/opendht/value.h 2018-02-26 22:19:32.000000000 +0000 @@ -37,7 +37,7 @@ #include #include -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT #include #endif //OPENDHT_PROXY_SERVER @@ -350,7 +350,7 @@ Value(ValueType::Id t, const uint8_t* dat_ptr, size_t dat_len, Id id = INVALID_ID) : id(id), type(t), data(dat_ptr, dat_ptr+dat_len) {} -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT /** * Build a value from a json object * @param json @@ -429,7 +429,7 @@ return ss.str(); } -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT /** * Build a json object from a value * Example: diff -Nru opendht-1.5.0/MSVC/opendht.vcxproj opendht-1.6.0/MSVC/opendht.vcxproj --- opendht-1.5.0/MSVC/opendht.vcxproj 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/MSVC/opendht.vcxproj 2018-02-26 22:19:32.000000000 +0000 @@ -19,11 +19,14 @@ + + + @@ -36,6 +39,8 @@ + + @@ -43,6 +48,7 @@ + @@ -51,7 +57,6 @@ - @@ -59,9 +64,11 @@ - + + + @@ -215,9 +222,9 @@ true true true - $(ProjectDir)contrib\build\include;$(ProjectDir)..\include;$(ProjectDir)..\include\opendht;$(ProjectDir)contrib\build\msgpack-c\include;$(ProjectDir)contrib\build\argon2\include + $(ProjectDir)contrib\build\include;$(ProjectDir)..\include;$(ProjectDir)..\include\opendht;$(ProjectDir)contrib\build\msgpack-c\include;$(ProjectDir)contrib\build\argon2\include;$(ProjectDir)..\..\include;$(ProjectDir)..\..\argon2\include _CRT_SECURE_NO_WARNINGS;WIN32_NATIVE;WIN32_LEAN_AND_MEAN;_MBCS;%(PreprocessorDefinitions) - 4804;4800;4101;4267;4244;4503; + 4804;4800;4101;4267;4244;4503;4273; -D_SCL_SECURE_NO_WARNINGS %(AdditionalOptions) $(OutDir)\lib\x64\$(TargetName).pdb diff -Nru opendht-1.5.0/MSVC/opendht.vcxproj.filters opendht-1.6.0/MSVC/opendht.vcxproj.filters --- opendht-1.5.0/MSVC/opendht.vcxproj.filters 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/MSVC/opendht.vcxproj.filters 2018-02-26 22:19:32.000000000 +0000 @@ -31,9 +31,6 @@ Source Files - - Source Files - Source Files @@ -46,80 +43,102 @@ Source Files + + Source Files + + + Source Files + + + Source Files\indexation + + - + + + + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + Header Files + + Header Files\opendht + - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files + Header Files\opendht + + + Header Files\opendht - Header Files + Header Files\opendht - Header Files - - - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files - - - Header Files - - - Header Files + Header Files\opendht - Header Files - - - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files - - - Header Files + Header Files\opendht - Header Files - - - Header Files + Header Files\opendht - Header Files + Header Files\opendht - Header Files + Header Files\opendht + + + Header Files\opendht\indexation + + @@ -128,5 +147,14 @@ {d1ab5bfe-3ab1-45ee-9324-b4b071887668} + + {939134d3-d5a2-4458-bca8-a10ea9042ea2} + + + {0b46e2d0-4874-404c-bae3-fae8d49b85b8} + + + {03bd5139-2ae5-41b0-a77b-944814e1b2b6} + \ No newline at end of file diff -Nru opendht-1.5.0/README.md opendht-1.6.0/README.md --- opendht-1.5.0/README.md 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/README.md 2018-02-26 22:19:32.000000000 +0000 @@ -91,9 +91,9 @@ - msgpack-c 1.2+, used for data serialization. - GnuTLS 3.3+, used for cryptographic operations. - Nettle 2.4+, a GnuTLS dependency for crypto. -- (optional) restbed 4.0+, used for the REST API. +- (optional) restbed used for the REST API. commit fb84213e170bc171fecd825a8e47ed9f881a12cd (https://github.com/AmarOk1412/restbed/tree/async_read_until) - (optional) jsoncpp 1.7.4-3+, used for the REST API. -- Build tested with GCC 4.8+ (GNU/Linux, Android, Windows with MinGW), Clang/LLVM (Linux, macOS). +- Build tested with GCC 5.2+ (GNU/Linux, Android, Windows with MinGW), Clang/LLVM (Linux, macOS). - Build tested with Microsoft Visual Studio 2015 ## Contact diff -Nru opendht-1.5.0/src/callbacks.cpp opendht-1.6.0/src/callbacks.cpp --- opendht-1.5.0/src/callbacks.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/callbacks.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -68,7 +68,7 @@ return ss.str(); } -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT /** * Build a json object from a NodeStats */ @@ -86,6 +86,18 @@ } return val; } + +NodeStats::NodeStats(const Json::Value& val) +{ + if (val.isMember("good")) + good_nodes = static_cast(val["good"].asLargestUInt()); + if (val.isMember("dubious")) + dubious_nodes = static_cast(val["dubious"].asLargestUInt()); + if (val.isMember("incoming")) + incoming_nodes = static_cast(val["incoming"].asLargestUInt()); + if (val.isMember("table_depth")) + table_depth = static_cast(val["table_depth"].asLargestUInt()); +} #endif //OPENDHT_PROXY_SERVER } diff -Nru opendht-1.5.0/src/crypto.cpp opendht-1.6.0/src/crypto.cpp --- opendht-1.5.0/src/crypto.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/crypto.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -538,7 +538,11 @@ return {}; InfoHash id; size_t sz = id.size(); +#if GNUTLS_VERSION_NUMBER < 0x030401 + const int flags = 0; +#else const int flags = (id.size() == 32) ? GNUTLS_KEYID_USE_SHA256 : 0; +#endif if (auto err = gnutls_pubkey_get_key_id(pk, flags, id.data(), &sz)) throw CryptoException(std::string("Can't get public key ID: ") + gnutls_strerror(err)); if (sz != id.size()) @@ -842,6 +846,15 @@ } std::chrono::system_clock::time_point +Certificate::getActivation() const +{ + auto t = gnutls_x509_crt_get_activation_time(cert); + if (t == (time_t)-1) + return std::chrono::system_clock::time_point::min(); + return std::chrono::system_clock::from_time_t(t); +} + +std::chrono::system_clock::time_point Certificate::getExpiration() const { auto t = gnutls_x509_crt_get_expiration_time(cert); @@ -913,8 +926,13 @@ return {}; Certificate ret {cert}; - gnutls_x509_crt_set_activation_time(cert, time(NULL)); - gnutls_x509_crt_set_expiration_time(cert, time(NULL) + (20 * 365 * 24 * 60 * 60)); + int64_t now = time(NULL); + /* 2038 bug: don't allow time wrap */ + auto boundTime = [](int64_t t) -> time_t { + return std::min(t, std::numeric_limits::max()); + }; + gnutls_x509_crt_set_activation_time(cert, boundTime(now)); + gnutls_x509_crt_set_expiration_time(cert, boundTime(now + (10 * 365 * 24 * 60 * 60))); if (gnutls_x509_crt_set_key(cert, key.x509_key) != GNUTLS_E_SUCCESS) { std::cerr << "Error when setting certificate key" << std::endl; return {}; @@ -1287,8 +1305,10 @@ o << "* Certificate has expired" << std::endl; if (h.result & GNUTLS_CERT_UNEXPECTED_OWNER) o << "* The owner is not the expected one" << std::endl; +#if GNUTLS_VERSION_NUMBER >= 0x030401 if (h.result & GNUTLS_CERT_PURPOSE_MISMATCH) o << "* Certificate or an intermediate does not match the intended purpose" << std::endl; +#endif if (h.result & GNUTLS_CERT_MISMATCH) o << "* Certificate presented isn't the expected one" << std::endl; } else { diff -Nru opendht-1.5.0/src/dht.cpp opendht-1.6.0/src/dht.cpp --- opendht-1.5.0/src/dht.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/dht.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014-2017 Savoir-faire Linux Inc. + * Copyright (C) 2014-2018 Savoir-faire Linux Inc. * Author(s) : Adrien Béraud * Simon Désaulniers * @@ -39,14 +39,6 @@ constexpr std::chrono::seconds Dht::LISTEN_EXPIRE_TIME; constexpr std::chrono::seconds Dht::REANNOUNCE_MARGIN; -void -Dht::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) -{ - DHT_LOG.DEBUG = debug; - DHT_LOG.WARN = warn; - DHT_LOG.ERR = error; -} - NodeStatus Dht::getStatus(sa_family_t af) const { @@ -138,7 +130,7 @@ auto& s = *it->second; if (s.insertNode(node, now)) { inserted = true; - scheduler.edit(s.nextSearchStep, s.getNextStepTime(now)); + scheduler.edit(s.nextSearchStep, now); } else if (not s.expired and not s.done) break; ++it; @@ -150,7 +142,7 @@ auto& s = *it->second; if (s.insertNode(node, now)) { inserted = true; - scheduler.edit(s.nextSearchStep, s.getNextStepTime(now)); + scheduler.edit(s.nextSearchStep, now); } else if (not s.expired and not s.done) break; } @@ -225,6 +217,7 @@ { const auto& now = scheduler.time(); if (auto sr = ws.lock()) { + sr->insertNode(req.node, now, answer.ntoken); if (auto srn = sr->getNode(req.node)) { /* all other get requests which are satisfied by this answer should not be sent anymore */ @@ -236,8 +229,12 @@ srn->getStatus[q] = std::move(dummy_req); } } + auto syncTime = srn->getSyncTime(scheduler.time()); + if (srn->syncJob) + scheduler.edit(srn->syncJob, syncTime); + else + srn->syncJob = scheduler.add(syncTime, std::bind(&Dht::searchStep, this, sr)); } - sr->insertNode(req.node, now, answer.ntoken); onGetValuesDone(req.node, answer, sr, query); } } @@ -344,8 +341,8 @@ if (not n) return nullptr; - DHT_LOG.w(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'", - sr->id.toString().c_str(), n->node->toString().c_str()); + /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'", + sr->id.toString().c_str(), n->node->toString().c_str());*/ n->getStatus[query] = network_engine.sendFindNode(n->node, sr->id, -1, @@ -358,8 +355,8 @@ if (query and not query->select.getSelection().empty()) { /* The request contains a select. No need to paginate... */ - DHT_LOG.w(sr->id, n->node->id, "[search %s] [node %s] sending 'get'", - sr->id.toString().c_str(), n->node->toString().c_str()); + /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'", + sr->id.toString().c_str(), n->node->toString().c_str());*/ n->getStatus[query] = network_engine.sendGetValues(n->node, sr->id, *query, @@ -419,7 +416,7 @@ const auto& now = scheduler.time(); if (not sn->isSynced(now)) { /* Search is now unsynced. Let's call searchStep to sync again. */ - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); + scheduler.edit(sr->nextSearchStep, now); return; } for (auto& a : sr->announce) { @@ -557,6 +554,8 @@ { /* on done */ if (auto sr = ws.lock()) { scheduler.edit(sr->nextSearchStep, scheduler.time()); + if (auto sn = sr->getNode(req.node)) + scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr)); onListenDone(req.node, answer, sr); } }, @@ -614,8 +613,8 @@ /* dumpSearch(*sr, std::cout); */ /* periodic searchStep scheduling. */ - if (not sr->done) - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); + //if (not sr->done) + // scheduler.edit(sr->nextSearchStep, now); } unsigned Dht::refill(Dht::Search& sr) { @@ -703,7 +702,7 @@ refill(*sr); if (sr->nextSearchStep) - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(scheduler.time())); + scheduler.edit(sr->nextSearchStep, scheduler.time()); else sr->nextSearchStep = scheduler.add(scheduler.time(), std::bind(&Dht::searchStep, this, sr)); @@ -790,12 +789,12 @@ sr->done = false; auto token = ++sr->listener_token; sr->listeners.emplace(token, LocalListener{q, f, cb}); - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); + scheduler.edit(sr->nextSearchStep, now); return token; } size_t -Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where) +Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f, Where where) { scheduler.syncTime(); @@ -1138,7 +1137,6 @@ void Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) { - DHT_LOG.d(id, "[store %s] changed", id.toString().c_str()); if (not st.local_listeners.empty()) { DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); std::vector>>> cbs; @@ -1158,20 +1156,22 @@ cb.first(cb.second); } - DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); - for (const auto& node_listeners : st.listeners) { - for (const auto& l : node_listeners.second) { - auto f = l.second.query.where.getFilter(); - if (f and not f(*v.data)) - continue; - DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending update", - id.toString().c_str(), - node_listeners.first->toString().c_str()); - std::vector> vals {}; - vals.push_back(v.data); - Blob ntoken = makeToken(node_listeners.first->getAddr(), false); - network_engine.tellListener(node_listeners.first, l.second.sid, id, 0, ntoken, {}, {}, - std::move(vals), l.second.query); + if (not st.listeners.empty()) { + DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); + for (const auto& node_listeners : st.listeners) { + for (const auto& l : node_listeners.second) { + auto f = l.second.query.where.getFilter(); + if (f and not f(*v.data)) + continue; + DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending update", + id.toString().c_str(), + node_listeners.first->toString().c_str()); + std::vector> vals {}; + vals.push_back(v.data); + Blob ntoken = makeToken(node_listeners.first->getAddr(), false); + network_engine.tellListener(node_listeners.first, l.first, id, 0, ntoken, {}, {}, + std::move(vals), l.second.query); + } } } } @@ -1232,10 +1232,10 @@ buckets4.findClosestNodes(id, now, TARGET_NODES), buckets6.findClosestNodes(id, now, TARGET_NODES), std::move(vals), query); } - node_listeners->second.emplace(socket_id, Listener {socket_id, now, std::forward(query)}); + node_listeners->second.emplace(socket_id, Listener {now, std::forward(query)}); } else - l->second.refresh(socket_id, now, std::forward(query)); + l->second.refresh(now, std::forward(query)); } void @@ -1442,10 +1442,8 @@ out << " [expired]"; bool synced = sr.isSynced(now); out << (synced ? " [synced]" : " [not synced]"); - if (synced && sr.isListening(now)) { - auto lt = sr.getListenTime(now); - out << " [listening, next in " << duration_cast(lt-now).count() << " s]"; - } + if (synced && sr.isListening(now)) + out << " [listening]"; out << std::endl; /*printing the queries*/ @@ -1657,7 +1655,7 @@ Dht::Dht() : store(), scheduler(DHT_LOG), network_engine(DHT_LOG, scheduler) {} -Dht::Dht(int s, int s6, Config config) +Dht::Dht(const int& s, const int& s6, Config config) : myid(config.node_id ? config.node_id : InfoHash::getRandom()), is_bootstrap(config.is_bootstrap), maintain_storage(config.maintain_storage), store(), store_quota(), @@ -2067,6 +2065,7 @@ n.token.clear(); n.last_get_reply = time_point::min(); searchSendGetValues(sr); + scheduler.edit(sr->nextSearchStep, scheduler.time()); break; } } @@ -2121,8 +2120,6 @@ if (st != store.end() && not st->second.empty()) { answer.values = st->second.get(query.where.getFilter()); DHT_LOG.d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size()); - } else { - DHT_LOG.d(hash, "[node %s] sending nodes", node->toString().c_str()); } return answer; } @@ -2137,14 +2134,13 @@ return; } - DHT_LOG.d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes", - sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size()); + /*DHT_LOG.d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes", + sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size());*/ if (not a.ntoken.empty()) { if (not a.values.empty() or not a.fields.empty()) { - DHT_LOG.d(sr->id, "[search %s IPv%c] found %u values", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - a.values.size()); + DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] found %u values", + sr->id.toString().c_str(), node->toString().c_str(), a.values.size()); for (auto& getp : sr->callbacks) { /* call all callbacks for this search */ auto& get = getp.second; if (not (get.get_cb or get.query_cb) or @@ -2229,7 +2225,7 @@ if (not sr->done) { const auto& now = scheduler.time(); searchSendGetValues(sr); - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); + scheduler.edit(sr->nextSearchStep, now); } } diff -Nru opendht-1.5.0/src/dht_proxy_client.cpp opendht-1.6.0/src/dht_proxy_client.cpp --- opendht-1.5.0/src/dht_proxy_client.cpp 1970-01-01 00:00:00.000000000 +0000 +++ opendht-1.6.0/src/dht_proxy_client.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -0,0 +1,796 @@ +/* + * Copyright (C) 2016-2018 Savoir-faire Linux Inc. + * Author: Sébastien Blin + * Adrien Béraud + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#if OPENDHT_PROXY_CLIENT + +#include "dht_proxy_client.h" + +#include +#include +#include +#include + +#include "dhtrunner.h" + +constexpr const char* const HTTP_PROTO {"http://"}; + +namespace dht { + +DhtProxyClient::DhtProxyClient(std::function signal, const std::string& serverHost, const std::string& pushClientId) +: serverHost_(serverHost), pushClientId_(pushClientId), scheduler(DHT_LOG), loopSignal_(signal) +{ + if (!serverHost_.empty()) + startProxy(); +} + +void +DhtProxyClient::confirmProxy() +{ + if (serverHost_.empty()) return; + getConnectivityStatus(); +} + +void +DhtProxyClient::startProxy() +{ + if (serverHost_.empty()) return; + DHT_LOG.WARN("Staring proxy client to %s", serverHost_.c_str()); + nextProxyConfirmation = scheduler.add(scheduler.time(), std::bind(&DhtProxyClient::confirmProxy, this)); +} + +DhtProxyClient::~DhtProxyClient() +{ + cancelAllOperations(); + cancelAllListeners(); +} + +void +DhtProxyClient::cancelAllOperations() +{ + std::lock_guard lock(lockOperations_); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (operation->thread.joinable()) { + // Close connection to stop operation? + restbed::Http::close(operation->req); + operation->thread.join(); + operation = operations_.erase(operation); + } else { + ++operation; + } + } +} + +void +DhtProxyClient::cancelAllListeners() +{ + std::lock_guard lock(lockListener_); + for (auto& listener: listeners_) { + if (listener.thread.joinable()) { + // Close connection to stop listener? + if (listener.req) + restbed::Http::close(listener.req); + listener.thread.join(); + } + } +} + +void +DhtProxyClient::shutdown(ShutdownCallback cb) +{ + cancelAllOperations(); + cancelAllListeners(); + if (cb) + cb(); +} + +NodeStatus +DhtProxyClient::getStatus(sa_family_t af) const +{ + std::lock_guard l(lockCurrentProxyInfos_); + switch (af) + { + case AF_INET: + return statusIpv4_; + case AF_INET6: + return statusIpv6_; + default: + return NodeStatus::Disconnected; + } +} + +bool +DhtProxyClient::isRunning(sa_family_t af) const +{ + std::lock_guard l(lockCurrentProxyInfos_); + switch (af) + { + case AF_INET: + return statusIpv4_ != NodeStatus::Disconnected; + case AF_INET6: + return statusIpv6_ != NodeStatus::Disconnected; + default: + return false; + } +} + +time_point +DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) +{ + // Exec all currently stored callbacks + scheduler.syncTime(); + if (!callbacks_.empty()) { + std::lock_guard lock(lockCallbacks); + for (auto& callback : callbacks_) + callback(); + callbacks_.clear(); + } + // Remove finished operations + { + std::lock_guard lock(lockOperations_); + auto operation = operations_.begin(); + while (operation != operations_.end()) { + if (*(operation->finished)) { + if (operation->thread.joinable()) { + // Close connection to stop operation? + restbed::Http::close(operation->req); + operation->thread.join(); + } + operation = operations_.erase(operation); + } else { + ++operation; + } + } + } + return scheduler.run(); +} + +void +DhtProxyClient::get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain) +{ + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared(uri); + + auto finished = std::make_shared(false); + Operation o; + o.req = req; + o.finished = finished; + o.thread = std::thread([=](){ + // Try to contact the proxy and set the status to connected when done. + // will change the connectivity status + auto ok = std::make_shared(true); + restbed::Http::async(req, + [=](const std::shared_ptr& req, + const std::shared_ptr& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + try { + while (restbed::Http::is_open(req) and not *finished) { + restbed::Http::fetch("\n", reply); + if (*finished) + break; + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&body[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + auto value = std::make_shared(json); + if ((not filterChain or filterChain(*value)) && cb) { + std::lock_guard lock(lockCallbacks); + callbacks_.emplace_back([cb, value, finished]() { + if (not cb({value})) + *finished = true; + }); + loopSignal_(); + } + } else { + *ok = false; + } + } + } catch (std::runtime_error& e) { } + } else { + *ok = false; + } + }).wait(); + if (donecb) { + std::lock_guard lock(lockCallbacks); + callbacks_.emplace_back([=](){ + donecb(*ok, {}); + }); + loopSignal_(); + } + if (!ok) { + // Connection failed, update connectivity + opFailed(); + } + *finished = true; + }); + { + std::lock_guard lock(lockOperations_); + operations_.emplace_back(std::move(o)); + } +} + +void +DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, + Value::Filter&& filter, Where&& where) +{ + Query query {{}, where}; + auto filterChain = filter.chain(query.where.getFilter()); + get(key, cb, donecb, filterChain); +} + +void +DhtProxyClient::put(const InfoHash& key, Sp val, DoneCallback cb, time_point, bool permanent) +{ + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared(uri); + req->set_method("POST"); + auto json = val->toJson(); + if (permanent) + json["permanent"] = true; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto body = Json::writeString(wbuilder, json) + "\n"; + req->set_body(body); + req->set_header("Content-Length", std::to_string(body.size())); + + auto finished = std::make_shared(false); + Operation o; + o.req = req; + o.finished = finished; + o.thread = std::thread([=](){ + auto ok = std::make_shared(true); + restbed::Http::async(req, + [this, ok](const std::shared_ptr& /*req*/, + const std::shared_ptr& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + try { + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&body[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (not reader->parse(char_data, char_data + body.size(), &json, &err)) + *ok = false; + } catch (...) { + *ok = false; + } + } else { + *ok = false; + } + }).wait(); + if (cb) { + std::lock_guard lock(lockCallbacks); + callbacks_.emplace_back([=](){ + cb(*ok, {}); + }); + loopSignal_(); + } + if (!ok) { + // Connection failed, update connectivity + opFailed(); + } + *finished = true; + }); + { + std::lock_guard lock(lockOperations_); + operations_.emplace_back(std::move(o)); + } +} + +NodeStats +DhtProxyClient::getNodesStats(sa_family_t af) const +{ + return af == AF_INET ? stats4_ : stats6_; +} + +void +DhtProxyClient::getProxyInfos() +{ + DHT_LOG.DEBUG("Requesting proxy server node information"); + + if (ongoingStatusUpdate_.test_and_set()) + return; + + { + std::lock_guard l(lockCurrentProxyInfos_); + if (statusIpv4_ == NodeStatus::Disconnected) + statusIpv4_ = NodeStatus::Connecting; + if (statusIpv6_ == NodeStatus::Disconnected) + statusIpv6_ = NodeStatus::Connecting; + } + + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/"); + auto req = std::make_shared(uri); + + // Try to contact the proxy and set the status to connected when done. + // will change the connectivity status + statusThread_ = std::thread([this, req]{ + restbed::Http::async(req, + [this](const std::shared_ptr&, + const std::shared_ptr& reply) { + auto code = reply->get_status_code(); + Json::Value proxyInfos; + if (code == 200) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + + std::string err; + Json::CharReaderBuilder rbuilder; + auto reader = std::unique_ptr(rbuilder.newCharReader()); + try { + reader->parse(body.data(), body.data() + body.size(), &proxyInfos, &err); + } catch (...) { + } + } + onProxyInfos(proxyInfos); + ongoingStatusUpdate_.clear(); + }); + }); + statusThread_.detach(); +} + +void +DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos) +{ + std::lock_guard l(lockCurrentProxyInfos_); + + auto oldStatus = std::max(statusIpv4_, statusIpv6_); + + try { + myid = InfoHash(proxyInfos["node_id"].asString()); + + stats4_ = NodeStats(proxyInfos["ipv4"]); + if (stats4_.good_nodes) + statusIpv4_ = NodeStatus::Connected; + else if (stats4_.dubious_nodes) + statusIpv4_ = NodeStatus::Connecting; + else + statusIpv4_ = NodeStatus::Disconnected; + + stats6_ = NodeStats(proxyInfos["ipv6"]); + if (stats6_.good_nodes) + statusIpv6_ = NodeStatus::Connected; + else if (stats6_.dubious_nodes) + statusIpv6_ = NodeStatus::Connecting; + else + statusIpv6_ = NodeStatus::Disconnected; + + publicAddress_ = parsePublicAddress(proxyInfos["public_ip"]); + } catch (...) {} + + auto newStatus = std::max(statusIpv4_, statusIpv6_); + + if (newStatus == NodeStatus::Connecting || newStatus == NodeStatus::Connected) { + if (oldStatus == NodeStatus::Disconnected) { + restartListeners(); + } + scheduler.edit(nextProxyConfirmation, scheduler.time() + std::chrono::minutes(15)); + } + else if (newStatus == NodeStatus::Disconnected) { + scheduler.edit(nextProxyConfirmation, scheduler.time() + std::chrono::minutes(1)); + } + loopSignal_(); +} + +SockAddr +DhtProxyClient::parsePublicAddress(const Json::Value& val) +{ + auto public_ip = val.asString(); + auto endIp = public_ip.find_last_of(':'); + std::string service = public_ip.substr(endIp + 1); + std::string address = public_ip.substr(0, endIp - 1); + auto sa = SockAddr::resolve(address, service); + if (sa.empty()) return {}; + return sa.front().getMappedIPv4(); +} + +std::vector +DhtProxyClient::getPublicAddress(sa_family_t family) +{ + std::lock_guard l(lockCurrentProxyInfos_); + if (not publicAddress_) return {}; + return publicAddress_.getFamily() == family ? std::vector{publicAddress_} : std::vector{}; +} + +size_t +DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter, Where where) +{ + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared(uri); + req->set_method(deviceKey_.empty() ? "LISTEN" : "SUBSCRIBE"); + + Query query {{}, where}; + auto filterChain = filter.chain(query.where.getFilter()); + auto pushNotifToken = std::make_shared(0); + + unsigned callbackId = 0; + { + std::lock_guard lock(lockCallback_); + callbackId_ += 1; + callbackId = callbackId_; + } + + Listener l; + ++listener_token_; + l.key = key.toString(); + l.token = listener_token_; + l.req = req; + l.cb = cb; + l.callbackId = callbackId; + l.pushNotifToken = pushNotifToken; + l.filterChain = std::move(filterChain); + l.thread = std::thread([=]() + { + auto settings = std::make_shared(); + if (deviceKey_.empty()) { + std::chrono::milliseconds timeout(std::numeric_limits::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + } +#if OPENDHT_PUSH_NOTIFICATIONS + else + fillBodyToGetToken(req, callbackId); +#endif + + struct State { + std::atomic_bool ok {true}; + std::atomic_bool cancel {false}; + }; + auto state = std::make_shared(); + restbed::Http::async(req, + [this, filterChain, cb, pushNotifToken, state](const std::shared_ptr& req, + const std::shared_ptr& reply) { + auto code = reply->get_status_code(); + if (code == 200) { + try { + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (!deviceKey_.empty()) { + restbed::Http::fetch("\n", reply); + if (state->cancel) + return; + std::string body; + reply->get_body(body); + + auto* char_data = reinterpret_cast(&body[0]); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + if (!json.isMember("token")) return; + *pushNotifToken = json["token"].asLargestUInt(); + } else { + state->ok = false; + } + } else { + while (restbed::Http::is_open(req) and not state->cancel) { + restbed::Http::fetch("\n", reply); + if (state->cancel) + break; + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + auto* char_data = reinterpret_cast(&body[0]); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + auto value = std::make_shared(json); + if ((not filterChain or filterChain(*value)) && cb) { + std::lock_guard lock(lockCallbacks); + callbacks_.emplace_back([cb, value, state]() { + if (not state->cancel and not cb({value})) { + state->cancel = true; + } + }); + loopSignal_(); + } + } else { + state->ok = false; + } + } + } + } catch (std::runtime_error&) { + state->ok = false; + } + } else { + state->ok = false; + } + }, settings).get(); + if (not state->ok) { + opFailed(); + } + } + ); + { + std::lock_guard lock(lockListener_); + listeners_.emplace_back(std::move(l)); + } + return listener_token_; +} + +bool +DhtProxyClient::cancelListen(const InfoHash&, size_t token) +{ + std::lock_guard lock(lockListener_); + for (auto it = listeners_.begin(); it != listeners_.end(); ++it) { + auto& listener = *it; + if (listener.token == token) { + if (!deviceKey_.empty()) { + // First, be sure to have a token + if (listener.thread.joinable()) { + listener.thread.join(); + } + // UNSUBSCRIBE + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + auto req = std::make_shared(uri); + req->set_method("UNSUBSCRIBE"); + // fill request body + Json::Value body; + body["key"] = deviceKey_; + body["client_id"] = pushClientId_; + body["token"] = std::to_string(token); + body["callback_id"] = listener.callbackId; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto content = Json::writeString(wbuilder, body) + "\n"; + std::replace(content.begin(), content.end(), '\n', ' '); + req->set_body(content); + req->set_header("Content-Length", std::to_string(content.size())); + + restbed::Http::async(req, + [](const std::shared_ptr&, + const std::shared_ptr&){} + ); + // And remove + listeners_.erase(it); + return true; + } else { + // Just stop the request + if (listener.thread.joinable()) { + // Close connection to stop listener? + if (listener.req) + restbed::Http::close(listener.req); + listener.thread.join(); + listeners_.erase(it); + return true; + } + } + } + } + return false; +} + +void +DhtProxyClient::opFailed() +{ + DHT_LOG.ERR("Proxy request failed"); + { + std::lock_guard l(lockCurrentProxyInfos_); + statusIpv4_ = NodeStatus::Disconnected; + statusIpv6_ = NodeStatus::Disconnected; + } + getConnectivityStatus(); + loopSignal_(); +} + +void +DhtProxyClient::getConnectivityStatus() +{ + getProxyInfos(); +} + +void +DhtProxyClient::restartListeners() +{ + std::lock_guard lock(lockListener_); + for (auto& listener: listeners_) { + if (listener.thread.joinable()) + listener.thread.join(); + // Redo listen + auto filterChain = listener.filterChain; + auto cb = listener.cb; + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + auto req = std::make_shared(uri); + req->set_method("LISTEN"); + listener.req = req; + listener.thread = std::thread([this, filterChain, cb, req]() + { + auto settings = std::make_shared(); + std::chrono::milliseconds timeout(std::numeric_limits::max()); + settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds. + + auto ok = std::make_shared(true); + restbed::Http::async(req, + [this, filterChain, cb, ok](const std::shared_ptr& req, + const std::shared_ptr& reply) { + auto code = reply->get_status_code(); + + if (code == 200) { + try { + while (restbed::Http::is_open(req)) { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + reply->set_body(""); // Reset the body for the next fetch + + Json::Value json; + std::string err; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&body[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + auto value = std::make_shared(json); + if ((not filterChain or filterChain(*value)) && cb) { + auto okCb = std::make_shared>(); + auto futureCb = okCb->get_future(); + { + std::lock_guard lock(lockCallbacks); + callbacks_.emplace_back([cb, value, okCb](){ + okCb->set_value(cb({value})); + }); + loopSignal_(); + } + futureCb.wait(); + if (!futureCb.get()) { + return; + } + } + } + } + } catch (std::runtime_error&) { + // NOTE: Http::close() can occurs here. Ignore this. + } + } else { + *ok = false; + } + }, settings).get(); + if (!ok) opFailed(); + } + ); + } +} + +void +DhtProxyClient::pushNotificationReceived(const std::map& notification) +{ +#if OPENDHT_PUSH_NOTIFICATIONS + try { + auto token = std::stoul(notification.at("token")); + for (const auto& listener: listeners_) { + if (*(listener.pushNotifToken) != token) + continue; + if (notification.find("timeout") == notification.cend()) { + // Wake up daemon and get values + get(InfoHash(listener.key), listener.cb, {}, listener.filterChain); + } else { + // A timeout has occured, we need to relaunch the listener + resubscribe(token); + } + + } + } catch (...) { + + } +#endif +} + +void +DhtProxyClient::resubscribe(const unsigned token) +{ +#if OPENDHT_PUSH_NOTIFICATIONS + if (deviceKey_.empty()) return; + for (auto& listener: listeners_) { + if (*(listener.pushNotifToken) == token) { + // Subscribe + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + auto req = std::make_shared(uri); + req->set_method("SUBSCRIBE"); + + auto pushNotifToken = std::make_shared(0); + + if (listener.thread.joinable()) + listener.thread.join(); + listener.req = req; + listener.pushNotifToken = pushNotifToken; + auto callbackId = listener.callbackId; + listener.thread = std::thread([=]() + { + fillBodyToGetToken(req, callbackId); + auto settings = std::make_shared(); + auto ok = std::make_shared(true); + restbed::Http::async(req, + [this, pushNotifToken, ok](const std::shared_ptr&, + const std::shared_ptr& reply) { + auto code = reply->get_status_code(); + if (code == 200) { + try { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&body[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + if (!json.isMember("token")) return; + *pushNotifToken = json["token"].asLargestUInt(); + } + } catch (std::runtime_error&) { + // NOTE: Http::close() can occurs here. Ignore this. + } + } else { + *ok = false; + } + }, settings).get(); + if (!ok) opFailed(); + }); + } + } +#endif +} + +#if OPENDHT_PUSH_NOTIFICATIONS +void +DhtProxyClient::fillBodyToGetToken(std::shared_ptr req, unsigned callbackId) +{ + // Fill body with + // { + // "key":"device_key", + // "callback_id": xxx + // } + Json::Value body; + body["key"] = deviceKey_; + body["client_id"] = pushClientId_; + body["callback_id"] = callbackId; +#ifdef __ANDROID__ + body["platform"] = "android"; +#endif +#ifdef __APPLE__ + body["platform"] = "apple"; +#endif + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto content = Json::writeString(wbuilder, body) + "\n"; + std::replace(content.begin(), content.end(), '\n', ' '); + req->set_body(content); + req->set_header("Content-Length", std::to_string(content.size())); +} +#endif // OPENDHT_PUSH_NOTIFICATIONS + +} // namespace dht + +#endif // OPENDHT_PROXY_CLIENT diff -Nru opendht-1.5.0/src/dht_proxy_server.cpp opendht-1.6.0/src/dht_proxy_server.cpp --- opendht-1.5.0/src/dht_proxy_server.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/dht_proxy_server.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -1,6 +1,6 @@ /* - * Copyright (C) 2017 Savoir-faire Linux Inc. - * Author : Sébastien Blin + * Copyright (C) 2017-2018 Savoir-faire Linux Inc. + * Author: Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -28,16 +28,30 @@ #include #include +#if OPENDHT_PUSH_NOTIFICATIONS +constexpr int const TIMEOUT {6 * 60 * 60}; // in seconds (so six hours) +constexpr const char* const HTTP_PROTO {"http://"}; // TODO, https for prod +#endif //OPENDHT_PUSH_NOTIFICATIONS + using namespace std::placeholders; namespace dht { -DhtProxyServer::DhtProxyServer(std::shared_ptr dht, in_port_t port) -: dht_(dht) +DhtProxyServer::DhtProxyServer(std::shared_ptr dht, in_port_t port , const std::string& pushServer) +: dht_(dht) , pushServer_(pushServer) { // NOTE in c++14, use make_unique service_ = std::unique_ptr(new restbed::Service()); + std::cout << "Running DHT proxy server on port " << port << std::endl; + if (not pushServer.empty()) { +#if !OPENDHT_PUSH_NOTIFICATIONS + std::cerr << "Push server defined but built OpenDHT built without push notification support" << std::endl; +#else + std::cout << "Using push notification server: " << pushServer << std::endl; +#endif + } + server_thread = std::thread([this, port]() { // Create endpoints auto resource = std::make_shared(); @@ -48,6 +62,10 @@ resource->set_path("/{hash: .*}"); resource->set_method_handler("GET", std::bind(&DhtProxyServer::get, this, _1)); resource->set_method_handler("LISTEN", std::bind(&DhtProxyServer::listen, this, _1)); +#if OPENDHT_PUSH_NOTIFICATIONS + resource->set_method_handler("SUBSCRIBE", std::bind(&DhtProxyServer::subscribe, this, _1)); + resource->set_method_handler("UNSUBSCRIBE", std::bind(&DhtProxyServer::unsubscribe, this, _1)); +#endif //OPENDHT_PUSH_NOTIFICATIONS resource->set_method_handler("POST", std::bind(&DhtProxyServer::put, this, _1)); #if OPENDHT_PROXY_SERVER_IDENTITY resource->set_method_handler("SIGN", std::bind(&DhtProxyServer::putSigned, this, _1)); @@ -68,6 +86,8 @@ std::chrono::milliseconds timeout(std::numeric_limits::max()); settings->set_connection_timeout(timeout); // there is a timeout, but really huge settings->set_port(port); + auto maxThreads = std::thread::hardware_concurrency() - 1; + settings->set_worker_limit(maxThreads > 1 ? maxThreads : 1); try { service_->start(settings); } catch(std::system_error& e) { @@ -76,24 +96,22 @@ }); listenThread_ = std::thread([this]() { - auto stop = false; - while (!stop) { - auto listener = currentListeners_.begin(); - while (listener != currentListeners_.end()) { - if (listener->session->is_closed() && dht_) { - dht_->cancelListen(listener->hash, std::move(listener->token)); - // Remove listener if unused - listener = currentListeners_.erase(listener); - } else { - ++listener; - } - } - //NOTE: When supports restbed 5.0: service_->is_up() and remove stopListeners - stop = stopListeners; - if (!stop) - std::this_thread::sleep_for(std::chrono::seconds(1)); + while (!service_->is_up() && !stopListeners) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + while (service_->is_up() && !stopListeners) { + removeClosedListeners(); + // add listener from push notifs +#if OPENDHT_PUSH_NOTIFICATIONS + handlePushListeners(); +#endif //OPENDHT_PUSH_NOTIFICATIONS + std::this_thread::sleep_for(std::chrono::seconds(1)); } + // Remove last listeners + removeClosedListeners(false); }); + + dht->forwardAllMessages(true); } DhtProxyServer::~DhtProxyServer() @@ -105,6 +123,14 @@ DhtProxyServer::stop() { service_->stop(); + { + std::lock_guard lock(lockListener_); + auto listener = currentListeners_.begin(); + while (listener != currentListeners_.end()) { + listener->session->close(); + ++ listener; + } + } stopListeners = true; // listenThreads_ will stop because there is no more sessions if (listenThread_.joinable()) @@ -129,8 +155,12 @@ result["node_id"] = dht_->getNodeId().toString(); result["ipv4"] = dht_->getNodesStats(AF_INET).toJson(); result["ipv6"] = dht_->getNodesStats(AF_INET6).toJson(); - Json::FastWriter writer; - s->close(restbed::OK, writer.write(result)); + result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, result) + "\n"; + s->close(restbed::OK, output); } else s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); @@ -153,10 +183,17 @@ infoHash = InfoHash::get(hash); } s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { - dht_->get(infoHash, [s](std::shared_ptr value) { + auto cacheSession = std::weak_ptr(s); + + dht_->get(infoHash, [cacheSession](std::shared_ptr value) { + auto s = cacheSession.lock(); + if (!s) return false; // Send values as soon as we get them - Json::FastWriter writer; - s->yield(writer.write(value->toJson()), [](const std::shared_ptr /*session*/){ }); + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + s->yield(output, [](const std::shared_ptr /*session*/){ }); return true; }, [s](bool /*ok* */) { // Communication is finished @@ -190,21 +227,32 @@ infoHash = InfoHash::get(hash); } s->yield(restbed::OK); - // Handle client deconnection - // NOTE: for now, there is no handler, so we test the session in a thread - // will be the case in restbed 5.0 + // Handle client deconnection + // NOTE: for now, there is no handler, so we test the session in a thread + // will be the case in restbed 5.0 SessionToHashToken listener; listener.session = session; listener.hash = infoHash; - listener.token = dht_->listen(infoHash, [s](std::shared_ptr value) { + // cache the session to avoid an incrementation of the shared_ptr's counter + // else, the session->close() will not close the socket. + auto cacheSession = std::weak_ptr(s); + listener.token = dht_->listen(infoHash, [cacheSession](std::shared_ptr value) { + auto s = cacheSession.lock(); + if (!s) return false; // Send values as soon as we get them if (!s->is_closed()) { - Json::FastWriter writer; - s->yield(writer.write(value->toJson()), [](const std::shared_ptr /*session*/){ }); + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + s->yield(output, [](const std::shared_ptr){ }); } return !s->is_closed(); }); - currentListeners_.emplace_back(std::move(listener)); + { + std::lock_guard lock(lockListener_); + currentListeners_.emplace_back(std::move(listener)); + } } else { session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } @@ -212,6 +260,210 @@ ); } +#if OPENDHT_PUSH_NOTIFICATIONS +void +DhtProxyServer::subscribe(const std::shared_ptr& session) const +{ + const auto request = session->get_request(); + int content_length = std::stoi(request->get_header("Content-Length", "0")); + auto hash = request->get_path_parameter("hash"); + InfoHash infoHash(hash); + if (!infoHash) + infoHash = InfoHash::get(hash); + session->fetch(content_length, + [=](const std::shared_ptr s, const restbed::Bytes& b) + { + try { + restbed::Bytes buf(b); + std::string strJson(buf.begin(), buf.end()); + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&strJson[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (!reader->parse(char_data, char_data + strJson.size(), &root, &err)) { + s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + return; + } + auto pushToken = root["key"].asString(); + if (pushToken.empty()) return; + auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; + auto platform = root["platform"].asString(); + auto isAndroid = platform == "android"; + auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string(); + + auto token = 0; + { + std::lock_guard lock(lockListener_); + // Check if listener is already present and refresh timeout if launched + for(auto& listener: pushListeners_) { + if (listener.pushToken == pushToken && listener.hash == infoHash + && listener.callbackId == callbackId) { + if (listener.started) + listener.deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(TIMEOUT); + s->close(restbed::OK, "{\"token\": " + std::to_string(listener.token) + "}\n"); + return; + } + } + // The listener is not found, so add it. + ++tokenPushNotif_; + token = tokenPushNotif_; + PushListener listener; + listener.pushToken = pushToken; + listener.hash = std::move(infoHash); + listener.token = token; + listener.started = false; + listener.callbackId = callbackId; + listener.clientId = clientId; + listener.isAndroid = isAndroid; + pushListeners_.emplace_back(std::move(listener)); + } + s->close(restbed::OK, "{\"token\": " + std::to_string(token) + "}\n"); + } catch (...) { + // do nothing + } + } + ); +} + +void +DhtProxyServer::unsubscribe(const std::shared_ptr& session) const +{ + const auto request = session->get_request(); + int content_length = std::stoi(request->get_header("Content-Length", "0")); + auto hash = request->get_path_parameter("hash"); + InfoHash infoHash(hash); + if (!infoHash) + infoHash = InfoHash::get(hash); + session->fetch(content_length, + [=](const std::shared_ptr s, const restbed::Bytes& b) + { + try { + restbed::Bytes buf(b); + std::string strJson(buf.begin(), buf.end()); + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&strJson[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (!reader->parse(char_data, char_data + strJson.size(), &root, &err)) { + s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + return; + } + auto pushToken = root["key"].asString(); + if (pushToken.empty()) return; + auto token = std::stoull(root["token"].asString()); + if (token == 0) return; + auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; + + std::lock_guard lock(lockListener_); + // Check if listener is already present and refresh timeout if launched + auto listener = pushListeners_.begin(); + while (listener != pushListeners_.end()) { + if (listener->pushToken == pushToken && listener->token == token + && listener->hash == infoHash && listener->callbackId == callbackId) { + if (dht_ && listener->started) + dht_->cancelListen(listener->hash, std::move(listener->internalToken.get())); + listener = pushListeners_.erase(listener); + } else { + ++listener; + } + } + } catch (...) { + // do nothing + } + } + ); +} + +void +DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value& json, bool isAndroid) const +{ + restbed::Uri uri(HTTP_PROTO + pushServer_ + "/api/push"); + auto req = std::make_shared(uri); + req->set_method("POST"); + + // NOTE: see https://github.com/appleboy/gorush + Json::Value notification(Json::objectValue); + Json::Value tokens(Json::arrayValue); + tokens[0] = token; + notification["tokens"] = tokens; + notification["platform"] = isAndroid ? 2 : 1; + notification["data"] = json; + notification["priority"] = "high"; + notification["time_to_live"] = 600; + + Json::Value notifications(Json::arrayValue); + notifications[0] = notification; + + Json::Value content; + content["notifications"] = notifications; + + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto valueStr = Json::writeString(wbuilder, content); + + req->set_header("Content-Type", "application/json"); + req->set_header("Accept", "*/*"); + req->set_header("Host", pushServer_); + req->set_header("Content-Length", std::to_string(valueStr.length())); + req->set_body(valueStr); + // Send request. + restbed::Http::async(req, {}); +} + +void +DhtProxyServer::handlePushListeners() +{ + std::lock_guard lock(lockListener_); + auto pushListener = pushListeners_.begin(); + while (pushListener != pushListeners_.end()) { + if (dht_ && !pushListener->started) { + // Try to start unstarted listeners + auto key = pushListener->pushToken; + auto token = pushListener->token; + auto callbackId = pushListener->callbackId; + auto isAndroid = pushListener->isAndroid; + auto clientId = pushListener->clientId; + pushListener->internalToken = dht_->listen(pushListener->hash, + [this, key, callbackId, token, isAndroid, clientId](std::vector> /*value*/) { + // Build message content. + Json::Value json; + if (callbackId > 0) { + json["callback_id"] = callbackId; + } + json["to"] = clientId; + json["token"] = token; + sendPushNotification(key, json, isAndroid); + return true; + } + ); + pushListener->deadline = std::chrono::steady_clock::now() + std::chrono::seconds(TIMEOUT); + pushListener->started = true; + pushListener++; + } else if (dht_ && pushListener->started && pushListener->deadline < std::chrono::steady_clock::now()) { + // Cancel listen if deadline has been reached + dht_->cancelListen(pushListener->hash, std::move(pushListener->internalToken.get())); + // Send a push notification to inform the client that this listen has timeout + Json::Value json; + json["timeout"] = pushListener->hash.toString(); + if (pushListener->callbackId > 0) { + json["callback_id"] = pushListener->callbackId; + } + json["to"] = pushListener->clientId; + json["token"] = pushListener->token; + sendPushNotification(pushListener->pushToken, json, pushListener->isAndroid); + pushListener = pushListeners_.erase(pushListener); + } else { + pushListener++; + } + } +} +#endif //OPENDHT_PUSH_NOTIFICATIONS + void DhtProxyServer::put(const std::shared_ptr& session) const { @@ -231,22 +483,30 @@ s->close(restbed::BAD_REQUEST, response); } else { restbed::Bytes buf(b); - Json::Value root; - Json::Reader reader; std::string strJson(buf.begin(), buf.end()); - bool parsingSuccessful = reader.parse(strJson.c_str(), root); - if (parsingSuccessful) { + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&strJson[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + strJson.size(), &root, &err)) { // Build the Value from json auto value = std::make_shared(root); + auto permanent = root.isMember("permanent") ? root["permanent"].asBool() : false; dht_->put(infoHash, value, [s, value](bool ok) { if (ok) { - Json::FastWriter writer; - s->close(restbed::OK, writer.write(value->toJson())); + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + if (s->is_open()) + s->close(restbed::OK, Json::writeString(wbuilder, value->toJson()) + "\n"); } else { - s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); + if (s->is_open()) + s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); } - }); + }, time_point::max(), permanent); } else { s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); } @@ -278,16 +538,22 @@ s->close(restbed::BAD_REQUEST, response); } else { restbed::Bytes buf(b); - Json::Value root; - Json::Reader reader; std::string strJson(buf.begin(), buf.end()); - bool parsingSuccessful = reader.parse(strJson.c_str(), root); - if (parsingSuccessful) { + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&strJson[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + strJson.size(), &root, &err)) { auto value = std::make_shared(root); - Json::FastWriter writer; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; dht_->putSigned(infoHash, value); - s->close(restbed::OK, writer.write(value->toJson())); + s->close(restbed::OK, output); } else { s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); } @@ -318,16 +584,23 @@ s->close(restbed::BAD_REQUEST, response); } else { restbed::Bytes buf(b); - Json::Value root; - Json::Reader reader; std::string strJson(buf.begin(), buf.end()); - bool parsingSuccessful = reader.parse(strJson.c_str(), root); + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast(&strJson[0]); + auto reader = std::unique_ptr(rbuilder.newCharReader()); + bool parsingSuccessful = reader->parse(char_data, char_data + strJson.size(), &root, &err); InfoHash to(root["to"].asString()); if (parsingSuccessful && to) { auto value = std::make_shared(root); - Json::FastWriter writer; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; dht_->putEncrypted(key, to, value); - s->close(restbed::OK, writer.write(value->toJson())); + s->close(restbed::OK, output); } else { if(!parsingSuccessful) s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); @@ -374,8 +647,11 @@ s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { dht_->get(infoHash, [s](std::shared_ptr v) { // Send values as soon as we get them - Json::FastWriter writer; - s->yield(writer.write(v->toJson()), [](const std::shared_ptr /*session*/){ }); + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, v->toJson()) + "\n"; + s->yield(output, [](const std::shared_ptr /*session*/){ }); return true; }, [s](bool /*ok* */) { // Communication is finished @@ -389,5 +665,23 @@ ); } +void +DhtProxyServer::removeClosedListeners(bool testSession) +{ + // clean useless listeners + std::lock_guard lock(lockListener_); + auto listener = currentListeners_.begin(); + while (listener != currentListeners_.end()) { + auto cancel = testSession ? dht_ && listener->session->is_closed() : static_cast(dht_); + if (cancel) { + dht_->cancelListen(listener->hash, std::move(listener->token.get())); + // Remove listener if unused + listener = currentListeners_.erase(listener); + } else { + ++listener; + } + } +} + } #endif //OPENDHT_PROXY_SERVER diff -Nru opendht-1.5.0/src/dhtrunner.cpp opendht-1.6.0/src/dhtrunner.cpp --- opendht-1.5.0/src/dhtrunner.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/dhtrunner.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,8 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. - * Author(s) : Adrien Béraud - * Simon Désaulniers + * Authors: Adrien Béraud + * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -20,6 +21,10 @@ #include "dhtrunner.h" #include "securedht.h" +#if OPENDHT_PROXY_CLIENT +#include "dht_proxy_client.h" +#endif + #ifndef _WIN32 #include #else @@ -39,6 +44,9 @@ constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; DhtRunner::DhtRunner() : dht_() +#if OPENDHT_PROXY_CLIENT +, dht_via_proxy_() +#endif //OPENDHT_PROXY_CLIENT { #ifdef _WIN32 WSADATA wsd; @@ -81,16 +89,28 @@ { if (running) return; - if (rcv_thread.joinable()) - rcv_thread.join(); + startNetwork(local4, local6); + + auto dht = std::unique_ptr(new Dht(s4, s6, SecureDht::getConfig(config.dht_config))); + dht_ = std::unique_ptr(new SecureDht(std::move(dht), config.dht_config)); + +#if OPENDHT_PROXY_CLIENT + config_ = config; +#endif + enableProxy(not config.proxy_server.empty()); + running = true; - doRun(local4, local6, config.dht_config); if (not config.threaded) return; - dht_thread = std::thread([this]() { + dht_thread = std::thread([this, local4, local6]() { while (running) { std::unique_lock lk(dht_mtx); - auto wakeup = loop_(); + time_point wakeup; + try { + wakeup = loop_(); + } catch (const dht::SocketException& e) { + startNetwork(local4, local6); + } cv.wait_until(lk, wakeup, [this]() { if (not running) return true; @@ -115,6 +135,10 @@ void DhtRunner::shutdown(ShutdownCallback cb) { +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) + dht_via_proxy_->shutdown(cb); +#endif std::lock_guard lck(storage_mtx); pending_ops_prio.emplace([=](SecureDht& dht) mutable { dht.shutdown(cb); @@ -125,15 +149,16 @@ void DhtRunner::join() { + running_network = false; running = false; cv.notify_all(); bootstrap_cv.notify_all(); if (dht_thread.joinable()) dht_thread.join(); - if (rcv_thread.joinable()) - rcv_thread.join(); if (bootstrap_thread.joinable()) bootstrap_thread.join(); + if (rcv_thread.joinable()) + rcv_thread.join(); { std::lock_guard lck(storage_mtx); @@ -142,11 +167,9 @@ } { std::lock_guard lck(dht_mtx); - dht_.reset(); + resetDht(); status4 = NodeStatus::Disconnected; status6 = NodeStatus::Disconnected; - bound4 = {}; - bound6 = {}; } } @@ -154,23 +177,23 @@ DhtRunner::dumpTables() const { std::lock_guard lck(dht_mtx); - dht_->dumpTables(); + activeDht()->dumpTables(); } InfoHash DhtRunner::getId() const { - if (!dht_) + if (!activeDht()) return {}; - return dht_->getId(); + return activeDht()->getId(); } InfoHash DhtRunner::getNodeId() const { - if (!dht_) + if (!activeDht()) return {}; - return dht_->getNodeId(); + return activeDht()->getNodeId(); } @@ -209,19 +232,30 @@ void DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { std::lock_guard lck(dht_mtx); - dht_->setLoggers(std::forward(error), std::forward(warn), std::forward(debug)); + if (dht_) + dht_->setLoggers(std::forward(error), std::forward(warn), std::forward(debug)); +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) + dht_via_proxy_->setLoggers(std::forward(error), std::forward(warn), std::forward(debug)); +#endif } void DhtRunner::setLogFilter(const InfoHash& f) { std::lock_guard lck(dht_mtx); - dht_->setLogFilter(f); + activeDht()->setLogFilter(f); + if (dht_) + dht_->setLogFilter(f); +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) + dht_via_proxy_->setLogFilter(f); +#endif } void DhtRunner::registerType(const ValueType& type) { std::lock_guard lck(dht_mtx); - dht_->registerType(type); + activeDht()->registerType(type); } void @@ -234,7 +268,7 @@ DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const { std::lock_guard lck(dht_mtx); - const auto stats = dht_->getNodesStats(af); + const auto stats = activeDht()->getNodesStats(af); if (good_return) *good_return = stats.good_nodes; if (dubious_return) @@ -250,51 +284,51 @@ DhtRunner::getNodesStats(sa_family_t af) const { std::lock_guard lck(dht_mtx); - return dht_->getNodesStats(af); + return activeDht()->getNodesStats(af); } std::vector DhtRunner::getNodeMessageStats(bool in) const { std::lock_guard lck(dht_mtx); - return dht_->getNodeMessageStats(in); + return activeDht()->getNodeMessageStats(in); } std::string DhtRunner::getStorageLog() const { std::lock_guard lck(dht_mtx); - return dht_->getStorageLog(); + return activeDht()->getStorageLog(); } std::string DhtRunner::getStorageLog(const InfoHash& f) const { std::lock_guard lck(dht_mtx); - return dht_->getStorageLog(f); + return activeDht()->getStorageLog(f); } std::string DhtRunner::getRoutingTablesLog(sa_family_t af) const { std::lock_guard lck(dht_mtx); - return dht_->getRoutingTablesLog(af); + return activeDht()->getRoutingTablesLog(af); } std::string DhtRunner::getSearchesLog(sa_family_t af) const { std::lock_guard lck(dht_mtx); - return dht_->getSearchesLog(af); + return activeDht()->getSearchesLog(af); } std::string DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const { std::lock_guard lck(dht_mtx); - return dht_->getSearchLog(f, af); + return activeDht()->getSearchLog(f, af); } std::vector DhtRunner::getPublicAddress(sa_family_t af) { std::lock_guard lck(dht_mtx); - return dht_->getPublicAddress(af); + return activeDht()->getPublicAddress(af); } std::vector DhtRunner::getPublicAddressStr(sa_family_t af) @@ -308,18 +342,24 @@ void DhtRunner::registerCertificate(std::shared_ptr cert) { std::lock_guard lck(dht_mtx); - dht_->registerCertificate(cert); + activeDht()->registerCertificate(cert); } void DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { std::lock_guard lck(dht_mtx); - dht_->setLocalCertificateStore(std::forward(query_method)); +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) + dht_via_proxy_->setLocalCertificateStore(std::forward(query_method)); +#endif + if (dht_) + dht_->setLocalCertificateStore(std::forward(query_method)); } time_point DhtRunner::loop_() { - if (!dht_) + auto dht = activeDht(); + if (not dht) return {}; decltype(pending_ops) ops {}; @@ -330,7 +370,7 @@ std::move(pending_ops) : std::move(pending_ops_prio); } while (not ops.empty()) { - ops.front()(*dht_); + ops.front()(*dht); ops.pop(); } @@ -345,15 +385,15 @@ for (const auto& pck : received) { auto& buf = pck.first; auto& from = pck.second; - wakeup = dht_->periodic(buf.data(), buf.size()-1, from); + wakeup = dht->periodic(buf.data(), buf.size()-1, from); } received.clear(); } else { - wakeup = dht_->periodic(nullptr, 0, nullptr, 0); + wakeup = dht->periodic(nullptr, 0, nullptr, 0); } - NodeStatus nstatus4 = dht_->getStatus(AF_INET); - NodeStatus nstatus6 = dht_->getStatus(AF_INET6); + NodeStatus nstatus4 = dht->getStatus(AF_INET); + NodeStatus nstatus6 = dht->getStatus(AF_INET6); if (nstatus4 != status4 || nstatus6 != status6) { status4 = nstatus4; status6 = nstatus6; @@ -397,12 +437,13 @@ } void -DhtRunner::doRun(const SockAddr& sin4, const SockAddr& sin6, SecureDht::Config config) +DhtRunner::startNetwork(const SockAddr sin4, const SockAddr sin6) { - dht_.reset(); - - int s4 = -1, - s6 = -1; + running_network = false; + if (rcv_thread.joinable()) + rcv_thread.join(); + s4 = -1; + s6 = -1; bound4 = {}; if (sin4) @@ -414,11 +455,10 @@ s6 = bindSocket(sin6, bound6); #endif - dht_ = std::unique_ptr(new SecureDht {s4, s6, config}); - - rcv_thread = std::thread([this,s4,s6]() { + running_network = true; + rcv_thread = std::thread([this]() { try { - while (true) { + while (running_network) { struct timeval tv {/*.tv_sec = */0, /*.tv_usec = */250000}; fd_set readfds; @@ -436,7 +476,7 @@ } } - if(!running) + if (not running_network) break; if(rc > 0) { @@ -466,6 +506,10 @@ close(s4); if (s6 >= 0) close(s6); + s4 = -1; + s6 = -1; + bound4 = {}; + bound6 = {}; }); } @@ -504,7 +548,28 @@ { std::lock_guard lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) mutable { - ret_token->set_value(dht.listen(hash, vcb, std::move(f), std::move(w))); + auto tokenbGlobal = listener_token_++; + Listener listener {}; + listener.hash = hash; + listener.f = std::move(f); + listener.w = std::move(w); + listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector>& vals){ + if (not vcb(vals)) { +#if OPENDHT_PROXY_CLIENT + cancelListen(hash, tokenbGlobal); +#endif + return false; + } + return true; + }; +#if OPENDHT_PROXY_CLIENT + if (use_proxy) + listener.tokenProxyDht = dht.listen(hash, listener.gcb, listener.f, listener.w); + else +#endif + listener.tokenClassicDht = dht.listen(hash, listener.gcb, listener.f, listener.w); + listeners_.emplace(tokenbGlobal, std::move(listener)); + ret_token->set_value(tokenbGlobal); }); } cv.notify_all(); @@ -520,12 +585,22 @@ void DhtRunner::cancelListen(InfoHash h, size_t token) { - { - std::lock_guard lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.cancelListen(h, token); - }); - } + std::lock_guard lck(storage_mtx); +#if OPENDHT_PROXY_CLIENT + pending_ops.emplace([=](SecureDht&) { + auto it = listeners_.find(token); + if (it == listeners_.end()) return; + if (it->second.tokenClassicDht) + dht_->cancelListen(h, it->second.tokenClassicDht); + if (it->second.tokenProxyDht and dht_via_proxy_) + dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht); + listeners_.erase(it); + }); +#else + pending_ops.emplace([=](SecureDht& dht) { + dht.cancelListen(h, token); + }); +#endif // OPENDHT_PROXY_CLIENT cv.notify_all(); } @@ -772,4 +847,118 @@ cv.notify_all(); } +void +DhtRunner::resetDht() +{ + listeners_.clear(); +#if OPENDHT_PROXY_CLIENT + dht_via_proxy_.reset(); +#endif // OPENDHT_PROXY_CLIENT + dht_.reset(); +} + +SecureDht* +DhtRunner::activeDht() const +{ +#if OPENDHT_PROXY_CLIENT + return use_proxy? dht_via_proxy_.get() : dht_.get(); +#else + return dht_.get(); +#endif // OPENDHT_PROXY_CLIENT +} + +void +DhtRunner::enableProxy(bool proxify) +{ +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) { + dht_via_proxy_->shutdown({}); + } + if (proxify) { + // Init the proxy client + auto dht_via_proxy = std::unique_ptr( + new DhtProxyClient([this]{ + if (config_.threaded) { + { + std::lock_guard lck(storage_mtx); + pending_ops_prio.emplace([=](SecureDht&) mutable {}); + } + cv.notify_all(); + } + }, config_.proxy_server, config_.push_node_id) + ); + dht_via_proxy_ = std::unique_ptr(new SecureDht(std::move(dht_via_proxy), config_.dht_config)); +#if OPENDHT_PUSH_NOTIFICATIONS + if (not pushToken_.empty()) + dht_via_proxy_->setPushNotificationToken(pushToken_); +#endif + // add current listeners + for (auto& l: listeners_) + l.second.tokenProxyDht = dht_via_proxy_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w); + // and use it + use_proxy = proxify; + } else { + use_proxy = proxify; + std::lock_guard lck(storage_mtx); + if (not listeners_.empty()) { + pending_ops.emplace([this](SecureDht& /*dht*/) mutable { + if (not dht_) + return; + for (auto& l : listeners_) { + if (not l.second.tokenClassicDht) { + l.second.tokenClassicDht = dht_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w); + } + } + }); + } + } +#else + if (proxify) + std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl; +#endif +} + +void +DhtRunner::forwardAllMessages(bool forward) +{ +#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT + if (dht_via_proxy_) + dht_via_proxy_->forwardAllMessages(forward); +#endif // OPENDHT_PROXY_CLIENT + if (dht_) + dht_->forwardAllMessages(forward); +#endif // OPENDHT_PROXY_SERVER +} + +/** + * Updates the push notification device token + */ +void +DhtRunner::setPushNotificationToken(const std::string& token) { +#if OPENDHT_PROXY_CLIENT && OPENDHT_PUSH_NOTIFICATIONS + pushToken_ = token; + if (dht_via_proxy_) + dht_via_proxy_->setPushNotificationToken(token); +#endif +} + +void +DhtRunner::pushNotificationReceived(const std::map& data) const +{ +#if OPENDHT_PROXY_CLIENT && OPENDHT_PUSH_NOTIFICATIONS + if (dht_via_proxy_) + dht_via_proxy_->pushNotificationReceived(data); +#endif +} + +void +DhtRunner::resubscribe(unsigned token) +{ +#if OPENDHT_PROXY_CLIENT && OPENDHT_PUSH_NOTIFICATIONS + if (dht_via_proxy_) + dht_via_proxy_->resubscribe(token); +#endif +} + } diff -Nru opendht-1.5.0/src/listener.h opendht-1.6.0/src/listener.h --- opendht-1.5.0/src/listener.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/listener.h 2018-02-26 22:19:32.000000000 +0000 @@ -27,14 +27,12 @@ * Foreign nodes asking for updates about an InfoHash. */ struct Listener { - size_t sid; time_point time; Query query; - Listener(size_t sid, time_point t, Query&& q) : sid(sid), time(t), query(std::move(q)) {} + Listener(time_point t, Query&& q) : time(t), query(std::move(q)) {} - void refresh(size_t s, time_point t, Query&& q) { - sid = s; + void refresh(time_point t, Query&& q) { time = t; query = std::move(q); } diff -Nru opendht-1.5.0/src/Makefile.am opendht-1.6.0/src/Makefile.am --- opendht-1.5.0/src/Makefile.am 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/Makefile.am 2018-02-26 22:19:32.000000000 +0000 @@ -59,6 +59,11 @@ nobase_include_HEADERS += ../include/opendht/dht_proxy_server.h endif +if ENABLE_PROXY_CLIENT +libopendht_la_SOURCES += dht_proxy_client.cpp +nobase_include_HEADERS += ../include/opendht/dht_proxy_client.h ../include/opendht/dht_interface.h +endif + clean-local: rm -rf libargon2.la diff -Nru opendht-1.5.0/src/network_engine.cpp opendht-1.6.0/src/network_engine.cpp --- opendht-1.5.0/src/network_engine.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/network_engine.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -126,8 +126,10 @@ : ntoken(std::move(msg.token)), values(std::move(msg.values)), fields(std::move(msg.fields)), nodes4(std::move(msg.nodes4)), nodes6(std::move(msg.nodes6)) {} -NetworkEngine::NetworkEngine(Logger& log, Scheduler& scheduler) : myid(zeroes), DHT_LOG(log), scheduler(scheduler) {} -NetworkEngine::NetworkEngine(InfoHash& myid, NetId net, int s, int s6, Logger& log, Scheduler& scheduler, +NetworkEngine::NetworkEngine(Logger& log, Scheduler& scheduler, const int& s, const int& s6) + : myid(zeroes), DHT_LOG(log), scheduler(scheduler), dht_socket(s), dht_socket6(s6) +{} +NetworkEngine::NetworkEngine(InfoHash& myid, NetId net, const int& s, const int& s6, Logger& log, Scheduler& scheduler, decltype(NetworkEngine::onError) onError, decltype(NetworkEngine::onNewNode) onNewNode, decltype(NetworkEngine::onReportedAddr) onReportedAddr, @@ -496,7 +498,7 @@ sendPong(from, msg->tid); break; case MessageType::FindNode: { - DHT_LOG.d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want); + //DHT_LOG.d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want); ++in_stats.find; RequestAnswer answer = onFindNode(node, msg->target, msg->want); auto nnodes = bufferNodes(from.getFamily(), msg->target, msg->want, answer.nodes4, answer.nodes6); @@ -504,7 +506,7 @@ break; } case MessageType::GetValues: { - DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); + //DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.get; RequestAnswer answer = onGetValues(node, msg->info_hash, msg->want, msg->query); auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); @@ -589,6 +591,9 @@ if (sendto(s, buf, len, flags, addr.get(), addr.getLength()) == -1) { int err = errno; DHT_LOG.e("Can't send message to %s: %s", addr.toString().c_str(), strerror(err)); + if (err == EPIPE) { + throw SocketException(EPIPE); + } return err; } return 0; diff -Nru opendht-1.5.0/src/node.cpp opendht-1.6.0/src/node.cpp --- opendht-1.5.0/src/node.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/node.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -30,11 +30,10 @@ constexpr std::chrono::minutes Node::NODE_GOOD_TIME; constexpr std::chrono::seconds Node::MAX_RESPONSE_TIME; - Node::Node(const InfoHash& id, const SockAddr& addr, bool client) : id(id), addr(addr), is_client(client), sockets_() { - crypto::random_device rd; + thread_local crypto::random_device rd {}; transaction_id = std::uniform_int_distribution{1}(rd); } @@ -134,18 +133,18 @@ if (++transaction_id == 0) transaction_id = 1; - auto sock = Socket(std::move(cb)); + auto sock = std::make_shared(std::move(cb)); auto s = sockets_.emplace(transaction_id, std::move(sock)); if (not s.second) s.first->second = std::move(sock); return transaction_id; } -Socket* +Sp Node::getSocket(Tid id) { auto it = sockets_.find(id); - return it == sockets_.end() ? nullptr : &it->second; + return it == sockets_.end() ? nullptr : it->second; } void diff -Nru opendht-1.5.0/src/search.h opendht-1.6.0/src/search.h --- opendht-1.5.0/src/search.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/search.h 2018-02-26 22:19:32.000000000 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014-2017 Savoir-faire Linux Inc. + * Copyright (C) 2014-2018 Savoir-faire Linux Inc. * Author(s) : Adrien Béraud * * This program is free software; you can redistribute it and/or modify @@ -74,6 +74,7 @@ time_point last_get_reply {time_point::min()}; /* last time received valid token */ bool candidate {false}; /* A search node is candidate if the search is/was synced and this node is a new candidate for inclusion. */ + Sp syncJob {}; SearchNode() : node() {} SearchNode(const SearchNode&) = delete; @@ -93,11 +94,17 @@ /** * Can we use this node to listen/announce now ? */ - bool isSynced(time_point now) const { + bool isSynced(const time_point& now) const { return not node->isExpired() and not token.empty() and last_get_reply >= now - Node::NODE_EXPIRE_TIME; } + time_point getSyncTime(const time_point& now) const { + if (node->isExpired() or token.empty()) + return now; + return last_get_reply + Node::NODE_EXPIRE_TIME; + } + /** * Could a particular "get" request be sent to this node now ? * @@ -421,8 +428,6 @@ done = true; } - time_point getUpdateTime(time_point now) const; - bool isAnnounced(Value::Id id) const; bool isListening(time_point now) const; @@ -461,26 +466,6 @@ } /** - * Returns the time of the next "announce" event for this search, - * or time_point::max() if no such event is planned. - * Only makes sense when the search is synced. - */ - time_point getAnnounceTime(time_point now) const; - - /** - * Returns the time of the next "listen" event for this search, - * or time_point::max() if no such event is planned. - * Only makes sense when the search is synced. - */ - time_point getListenTime(time_point now) const; - - /** - * Returns the time of the next event for this search, - * or time_point::max() if no such event is planned. - */ - time_point getNextStepTime(time_point now) const; - - /** * Removes a node which have been expired for at least * NODE::NODE_EXPIRE_TIME minutes. The search for an expired node starts * from the end. @@ -721,36 +706,6 @@ return true; } -time_point -Dht::Search::getUpdateTime(time_point now) const -{ - time_point ut = time_point::max(); - const auto last_get = getLastGetTime(); - unsigned i = 0, t = 0, d = 0; - const auto solicited_nodes = currentlySolicitedNodeCount(); - for (const auto& sn : nodes) { - if (sn.node->isExpired() or (sn.candidate and t >= TARGET_NODES)) - continue; - auto pending = sn.pendingGet(); - if (sn.last_get_reply < std::max(now - Node::NODE_EXPIRE_TIME, last_get) or pending) { - // not isSynced - if (not pending and solicited_nodes < MAX_REQUESTED_SEARCH_NODES) - ut = std::min(ut, now); - if (not sn.candidate) - d++; - } else - ut = std::min(ut, sn.last_get_reply + Node::NODE_EXPIRE_TIME); - - t++; - if (not sn.candidate and ++i == TARGET_NODES) - break; - } - if (not callbacks.empty() and d == 0) - // If all synced/updated but some callbacks remain, step now to clear them - return now; - return ut; -} - bool Dht::Search::isAnnounced(Value::Id id) const { @@ -790,78 +745,4 @@ return i; } -time_point -Dht::Search::getAnnounceTime(time_point now) const -{ - if (nodes.empty()) - return time_point::max(); - time_point ret {time_point::max()}; - for (const auto& a : announce) { - if (!a.value) continue; - unsigned i = 0, t = 0; - for (const auto& n : nodes) { - if (not n.isSynced(now) or (n.candidate and t >= TARGET_NODES)) - continue; - ret = std::min(ret, n.getAnnounceTime(a.value->id)); - t++; - if (not n.candidate and ++i == TARGET_NODES) - break; - } - } - return ret; -} - -time_point -Dht::Search::getListenTime(time_point now) const -{ - if (listeners.empty()) - return time_point::max(); - - time_point listen_time {time_point::max()}; - unsigned i = 0, t = 0; - for (const auto& sn : nodes) { - if (not sn.isSynced(now) or (sn.candidate and t >= LISTEN_NODES)) - continue; - for (auto& l : listeners) - listen_time = std::min(listen_time, sn.getListenTime(l.second.query)); - t++; - if (not sn.candidate and ++i == LISTEN_NODES) - break; - } - return listen_time; -} - -time_point -Dht::Search::getNextStepTime(time_point now) const -{ - auto next_step = time_point::max(); - if (expired or done) - return next_step; - - auto ut = getUpdateTime(now); - if (ut != time_point::max()) { - //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " update time in " << print_dt(ut - now) << " s" << std::endl; - next_step = std::min(next_step, ut); - } - - if (isSynced(now)) - { - auto at = getAnnounceTime(now); - if (at != time_point::max()) { - //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " announce time in " << print_dt(at - now) << " s" << std::endl; - next_step = std::min(next_step, at); - } - - auto lt = getListenTime(now); - if (lt != time_point::max()) { - //std::cout << id.toString() << " IPv" << (af==AF_INET?"4":"6") << " listen time in " << print_dt(lt - now) << " s" << std::endl; - next_step = std::min(next_step, lt); - } - } - - - return next_step; -} - - } diff -Nru opendht-1.5.0/src/securedht.cpp opendht-1.6.0/src/securedht.cpp --- opendht-1.5.0/src/securedht.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/securedht.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -1,7 +1,8 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. - * Author : Adrien Béraud + * Authors: Adrien Béraud * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -32,20 +33,10 @@ namespace dht { -Config& getConfig(SecureDht::Config& conf) +SecureDht::SecureDht(std::unique_ptr dht, SecureDht::Config conf) +: dht_(std::move(dht)), key_(conf.id.first), certificate_(conf.id.second) { - auto& c = conf.node_config; - if (not c.node_id and conf.id.second) - c.node_id = InfoHash::get("node:"+conf.id.second->getId().toString()); - return c; -} - -SecureDht::SecureDht(int s, int s6, SecureDht::Config conf) -: Dht(s, s6, getConfig(conf)), key_(conf.id.first), certificate_(conf.id.second) -{ - if (s < 0 && s6 < 0) - return; - + if (!dht_) return; for (const auto& type : DEFAULT_TYPES) registerType(type); @@ -59,7 +50,7 @@ if (key_ and certId != key_->getPublicKey().getId()) throw DhtException("SecureDht: provided certificate doesn't match private key."); - Dht::put(certId, Value { + dht_->put(certId, Value { CERTIFICATE_TYPE, *certificate_, 1 @@ -190,7 +181,7 @@ } auto found = std::make_shared(false); - Dht::get(node, [cb,node,found,this](const std::vector>& vals) { + dht_->get(node, [cb,node,found,this](const std::vector>& vals) { if (*found) return false; for (const auto& v : vals) { @@ -238,8 +229,13 @@ for (const auto& v : values) { // Decrypt encrypted values if (v->isEncrypted()) { - if (not key_) + if (not key_) { +#if OPENDHT_PROXY_SERVER + if (forward_all_) // We are currently a proxy, send messages to clients. + tmpvals.push_back(v); +#endif continue; + } try { Value decrypted_val (decrypt(*v)); if (decrypted_val.recipient == getId()) { @@ -277,13 +273,13 @@ void SecureDht::get(const InfoHash& id, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w) { - Dht::get(id, getCallbackFilter(cb, std::forward(f)), donecb, {}, std::forward(w)); + dht_->get(id, getCallbackFilter(cb, std::forward(f)), donecb, {}, std::forward(w)); } size_t -SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& w) +SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter f, Where w) { - return Dht::listen(id, getCallbackFilter(cb, std::forward(f)), {}, std::forward(w)); + return dht_->listen(id, getCallbackFilter(cb, std::forward(f)), {}, std::forward(w)); } void @@ -295,7 +291,7 @@ } // Check if we are already announcing a value - auto p = getPut(hash, val->id); + auto p = dht_->getPut(hash, val->id); if (p && val->seq <= p->seq) { DHT_LOG.DEBUG("Found previous value being announced."); val->seq = p->seq + 1; @@ -317,7 +313,7 @@ }, [hash,val,this,callback,permanent] (bool /* ok */) { sign(*val); - put(hash, val, callback, time_point::max(), permanent); + dht_->put(hash, val, callback, time_point::max(), permanent); }, Value::IdFilter(val->id) ); @@ -334,7 +330,7 @@ } DHT_LOG.WARN("Encrypting data for PK: %s", pk->getId().toString().c_str()); try { - put(hash, encrypt(*val, *pk), callback, time_point::max(), permanent); + dht_->put(hash, encrypt(*val, *pk), callback, time_point::max(), permanent); } catch (const std::exception& e) { DHT_LOG.ERR("Error putting encrypted data: %s", e.what()); if (callback) diff -Nru opendht-1.5.0/src/utils.cpp opendht-1.6.0/src/utils.cpp --- opendht-1.5.0/src/utils.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/utils.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -27,6 +27,8 @@ namespace dht { +static constexpr std::array MAPPED_IPV4_PREFIX {{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}}; + std::vector SockAddr::resolve(const std::string& host, const std::string& service) { @@ -139,6 +141,30 @@ } } +bool +SockAddr::isMappedIPv4() const +{ + if (getFamily() != AF_INET6) + return false; + const uint8_t* addr6 = reinterpret_cast(&getIPv6().sin6_addr); + return std::equal(MAPPED_IPV4_PREFIX.begin(), MAPPED_IPV4_PREFIX.end(), addr6); +} + +SockAddr +SockAddr::getMappedIPv4() const +{ + if (not isMappedIPv4()) + return *this; + SockAddr ret; + ret.setFamily(AF_INET); + ret.setPort(getPort()); + auto addr6 = reinterpret_cast(&getIPv6().sin6_addr); + auto addr4 = reinterpret_cast(&ret.getIPv4().sin_addr); + addr6 += MAPPED_IPV4_PREFIX.size(); + std::copy_n(addr6, sizeof(in_addr), addr4); + return ret; +} + bool operator==(const SockAddr& a, const SockAddr& b) { return a.equals(b); } diff -Nru opendht-1.5.0/src/value.cpp opendht-1.6.0/src/value.cpp --- opendht-1.5.0/src/value.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/src/value.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -22,7 +22,7 @@ #include "default_types.h" #include "securedht.h" // print certificate ID -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT #include "base64.h" #endif //OPENDHT_PROXY_SERVER @@ -172,7 +172,7 @@ } } -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_SERVER || OPENDHT_PROXY_CLIENT Value::Value(Json::Value& json) { try { diff -Nru opendht-1.5.0/tools/dhtchat.cpp opendht-1.6.0/tools/dhtchat.cpp --- opendht-1.5.0/tools/dhtchat.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/tools/dhtchat.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -67,6 +67,13 @@ if (not params.bootstrap.first.empty()) dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); +#if OPENDHT_PROXY_CLIENT + if (!params.proxyclient.empty()) { + dht.setProxyServer(params.proxyclient); + dht.enableProxy(true); + } +#endif //OPENDHT_PROXY_CLIENT + print_node_info(dht, params); std::cout << " type 'c {hash}' to join a channel" << std::endl << std::endl; diff -Nru opendht-1.5.0/tools/dhtnode.cpp opendht-1.6.0/tools/dhtnode.cpp --- opendht-1.5.0/tools/dhtnode.cpp 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/tools/dhtnode.cpp 2018-02-26 22:19:32.000000000 +0000 @@ -1,8 +1,9 @@ /* * Copyright (C) 2014-2017 Savoir-faire Linux Inc. * - * Author: Adrien Béraud - * Simon Désaulniers + * Authors: Adrien Béraud + * Simon Désaulniers + * Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -59,14 +60,33 @@ #if OPENDHT_PROXY_SERVER std::cout << std::endl << "Operations with the proxy:" << std::endl - << " pst [port] Start the proxy interface on port." << std::endl - << " psp [port] Stop the proxy interface on port." << std::endl; +#if OPENDHT_PUSH_NOTIFICATIONS + << " pst [port] Start the proxy interface on port." << std::endl +#else + << " pst [port] Start the proxy interface on port." << std::endl +#endif // OPENDHT_PUSH_NOTIFICATIONS + << " psp [port] Stop the proxy interface on port." << std::endl; #endif //OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT + std::cout << std::endl << "Operations with the proxy:" << std::endl +#if OPENDHT_PUSH_NOTIFICATIONS + << " stt [server_address] Start the proxy client." << std::endl +#else + << " stt [server_address] Start the proxy client." << std::endl +#endif // OPENDHT_PUSH_NOTIFICATIONS +#if OPENDHT_PUSH_NOTIFICATIONS + << " rs [token] Resubscribe to opendht." << std::endl + << " rp [push_notification] Inject a push notification in Opendht." << std::endl +#endif // OPENDHT_PUSH_NOTIFICATIONS + << " stp Stop the proxy client." << std::endl; +#endif //OPENDHT_PROXY_CLIENT + std::cout << std::endl << "Operations on the DHT:" << std::endl << " b Ping potential node at given IP address/port." << std::endl << " g Get values at ." << std::endl << " l Listen for value changes at ." << std::endl + << " cl Cancel listen for and ." << std::endl << " p Put string value at ." << std::endl << " pp Put string value at (persistent version)." << std::endl << " s Put string value at , signed with our generated private key." << std::endl @@ -79,7 +99,11 @@ << std::endl; } -void cmd_loop(std::shared_ptr& dht, dht_params& params) +void cmd_loop(std::shared_ptr& dht, dht_params& params +#if OPENDHT_PROXY_SERVER + , std::map>& proxies +#endif +) { print_node_info(dht, params); std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl << std::endl; @@ -90,12 +114,6 @@ #endif std::map indexes; -#if OPENDHT_PROXY_SERVER - std::map> proxies; - if (params.proxyserver != 0) { - proxies.emplace(params.proxyserver, new DhtProxyServer(dht, params.proxyserver)); - } -#endif //OPENDHT_PROXY_SERVER while (true) { @@ -105,7 +123,7 @@ break; std::istringstream iss(line); - std::string op, idstr, value, index, keystr; + std::string op, idstr, value, index, keystr, pushServer, deviceKey; iss >> op; if (op == "x" || op == "exit" || op == "quit") { @@ -175,10 +193,18 @@ } #if OPENDHT_PROXY_SERVER else if (op == "pst") { - iss >> idstr; +#if OPENDHT_PUSH_NOTIFICATIONS + iss >> idstr >> pushServer; +#else + iss >> idstr; +#endif // OPENDHT_PUSH_NOTIFICATIONS try { unsigned int port = std::stoi(idstr); - proxies.emplace(port, new DhtProxyServer(dht, port)); +#if OPENDHT_PUSH_NOTIFICATIONS + proxies.emplace(port, std::unique_ptr(new DhtProxyServer(dht, port, pushServer))); +#else + proxies.emplace(port, std::unique_ptr(new DhtProxyServer(dht, port))); +#endif // OPENDHT_PUSH_NOTIFICATIONS } catch (...) { } continue; } else if (op == "psp") { @@ -191,6 +217,29 @@ continue; } #endif //OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT + else if (op == "stt") { + dht->enableProxy(true); + continue; + } else if (op == "stp") { + dht->enableProxy(false); + continue; + } +#if OPENDHT_PUSH_NOTIFICATIONS + else if (op == "rp") { + iss >> value; + dht->pushNotificationReceived({{"token", value}}); + continue; + } else if (op == "re") { + iss >> value; + try { + unsigned token = std::stoul(value); + dht->resubscribe(token); + } catch (...) { } + continue; + } +#endif // OPENDHT_PUSH_NOTIFICATIONS +#endif //OPENDHT_PROXY_CLIENT if (op.empty()) continue; @@ -224,6 +273,10 @@ indexes.emplace(index, Pht {index, std::move(ks), dht}); } catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; } } + } else if (op == "cl") { + std::string hash, rem; + iss >> hash >> rem; + dht->cancelListen(dht::InfoHash(hash), std::stoul(rem)); } else { // Dht syntax @@ -277,12 +330,8 @@ std::cout << "\t" << *value << std::endl; return true; }, {}, dht::Where {std::move(rem)}); - std::cout << "Listening, token: " << token.get() << std::endl; - } - else if (op == "cl") { - std::string rem; - iss >> rem; - dht->cancelListen(id, std::stoul(rem)); + auto t = token.get(); + std::cout << "Listening, token: " << t << std::endl; } else if (op == "p") { std::string v; @@ -418,6 +467,7 @@ print_usage(); return 0; } + if (params.daemonize) { daemonize(); } else if (params.service) { @@ -427,10 +477,19 @@ dht::crypto::Identity crt {}; if (params.generate_identity) { auto ca_tmp = dht::crypto::generateEcIdentity("DHT Node CA"); - crt = dht::crypto::generateEcIdentity("DHT Node", ca_tmp); + crt = dht::crypto::generateIdentity("DHT Node", ca_tmp); } - dht->run(params.port, crt, true, params.network); + dht::DhtRunner::Config config; + config.dht_config.node_config.network = params.network; + config.dht_config.id = crt; + config.threaded = true; + config.proxy_server = params.proxyclient; + config.push_node_id = "dhtnode"; + if (not params.proxyclient.empty()) + dht->setPushNotificationToken(params.devicekey); + + dht->run(params.port, config); if (params.log) { if (params.syslog or (params.daemonize and params.logfile.empty())) @@ -446,12 +505,27 @@ dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); } - if (params.daemonize or params.service) { - while (runner.wait()); - } else { - cmd_loop(dht, params); +#if OPENDHT_PROXY_SERVER + std::map> proxies; +#endif + if (params.proxyserver != 0) { +#if OPENDHT_PROXY_SERVER + proxies.emplace(params.proxyserver, std::unique_ptr(new DhtProxyServer(dht, params.proxyserver, params.pushserver))); +#else + std::cerr << "DHT proxy server requested but OpenDHT built without proxy server support." << std::endl; + exit(EXIT_FAILURE); +#endif } + if (params.daemonize or params.service) + while (runner.wait()); + else + cmd_loop(dht, params +#if OPENDHT_PROXY_SERVER + , proxies +#endif + ); + } catch(const std::exception&e) { std::cerr << std::endl << e.what() << std::endl; } diff -Nru opendht-1.5.0/tools/tools_common.h opendht-1.6.0/tools/tools_common.h --- opendht-1.5.0/tools/tools_common.h 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/tools/tools_common.h 2018-02-26 22:19:32.000000000 +0000 @@ -2,6 +2,7 @@ * Copyright (C) 2014-2017 Savoir-faire Linux Inc. * * Author: Adrien Béraud + * Author: Sébastien Blin * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -120,6 +121,9 @@ bool service {false}; std::pair bootstrap {}; in_port_t proxyserver {0}; + std::string proxyclient {}; + std::string pushserver {}; + std::string devicekey {}; }; static const constexpr struct option long_options[] = { @@ -134,6 +138,9 @@ {"logfile", required_argument, nullptr, 'l'}, {"syslog", no_argument , nullptr, 'L'}, {"proxyserver",required_argument, nullptr, 'S'}, + {"proxyclient",required_argument, nullptr, 'C'}, + {"pushserver", required_argument, nullptr, 'P'}, + {"devicekey", required_argument, nullptr, 'D'}, {nullptr, 0 , nullptr, 0} }; @@ -159,6 +166,15 @@ std::cout << "Invalid port: " << port_arg << std::endl; } break; + case 'P': + params.pushserver = optarg; + break; + case 'C': + params.proxyclient = optarg; + break; + case 'D': + params.devicekey = optarg; + break; case 'n': params.network = strtoul(optarg, nullptr, 0); break; diff -Nru opendht-1.5.0/.travis.yml opendht-1.6.0/.travis.yml --- opendht-1.5.0/.travis.yml 2017-11-17 18:34:31.000000000 +0000 +++ opendht-1.6.0/.travis.yml 2018-02-26 22:19:32.000000000 +0000 @@ -5,13 +5,67 @@ language: cpp +env: + matrix: + - OPENDHT_TEST_JOB="opendht.classic" + - OPENDHT_TEST_JOB="opendht.llvm" + - OPENDHT_TEST_JOB="opendht.proxyserver" + - OPENDHT_TEST_JOB="opendht.proxyclient" + - OPENDHT_TEST_JOB="opendht.proxyserverpush" + - OPENDHT_TEST_JOB="opendht.proxyclientpush" + - OPENDHT_TEST_JOB="opendht.all" + before_install: -- docker pull aberaud/opendht-deps -- docker pull aberaud/opendht-deps-llvm + - | + # non llvm builds + if [[ "$OPENDHT_TEST_JOB" != *"opendht.llvm"* ]]; then + docker pull aberaud/opendht-deps; + if [[ "$OPENDHT_TEST_JOB" != *"opendht.classic"* ]]; then + docker build -t opendht-deps-proxy -f docker/DockerfileDepsProxy .; + fi + fi + + - | + # classic build + if [[ "$OPENDHT_TEST_JOB" == *"opendht.llvm"* ]]; then + docker pull aberaud/opendht-deps-llvm + fi script: -- docker build -f docker/DockerfileTravis . -- docker build -f docker/DockerfileTravisLlvm . + - | + # classic build + if [[ "$OPENDHT_TEST_JOB" == *"opendht.classic"* ]]; then + docker build -t opendht -f docker/DockerfileTravis .; + fi + + - | + # proxy builds + if [[ "$OPENDHT_TEST_JOB" != *"opendht.llvm"* ]] && [[ "$OPENDHT_TEST_JOB" != *"opendht.classic"* ]]; then + docker build -t opendht-proxy -f docker/DockerfileTravisProxy .; + options=''; + if [[ "$OPENDHT_TEST_JOB" == *"opendht.proxyserver"* ]] || [[ "$OPENDHT_TEST_JOB" == *"opendht.proxyserverpush"* ]] || [[ "$OPENDHT_TEST_JOB" == *"opendht.all"* ]]; then + options+='-DOPENDHT_PROXY_SERVER=ON '; + else + options+='-DOPENDHT_PROXY_SERVER=OFF '; + fi + if [[ "$OPENDHT_TEST_JOB" == *"opendht.proxyclient"* ]] || [[ "$OPENDHT_TEST_JOB" == *"opendht.proxyclientpush"* ]] || [[ "$OPENDHT_TEST_JOB" == *"opendht.all"* ]]; then + options+='-DOPENDHT_PROXY_CLIENT=ON '; + else + options+='-DOPENDHT_PROXY_CLIENT=OFF '; + fi + if [[ "$OPENDHT_TEST_JOB" == *"opendht.proxyserverpush"* ]] || [[ "$OPENDHT_TEST_JOB" == *"opendht.proxyclientpush"* ]] || [[ "$OPENDHT_TEST_JOB" == *"opendht.all"* ]]; then + options+='-DOPENDHT_PUSH_NOTIFICATIONS=ON '; + else + options+='-DOPENDHT_PUSH_NOTIFICATIONS=OFF '; + fi + docker run opendht-proxy /bin/sh -c "cd /root/opendht && mkdir build && cd build && cmake -DCMAKE_INSTALL_PREFIX=/usr -DOPENDHT_PYTHON=ON -DOPENDHT_LTO=ON $options .. && make -j8 && make install"; + fi + + - | + # llvm build + if [[ "$OPENDHT_TEST_JOB" == *"opendht.llvm"* ]]; then + docker build -f docker/DockerfileTravisLlvm . + fi notifications: email: