diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 92444b0fd4da044afe53dc125d1670b1565fe08b..0f0357eee1a9fa221bf2fe0750edc86b712d7cdc 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -175,12 +175,12 @@ public: /** * Get locally stored data for the given hash. */ - std::vector<std::shared_ptr<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const; + std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const; /** * Get locally stored data for the given key and value id. */ - std::shared_ptr<Value> getLocalById(const InfoHash& key, Value::Id vid) const; + Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const; /** * Announce a value on all available protocols (IPv4, IPv6). @@ -189,12 +189,12 @@ public: * The done callback will be called once, when the first announce succeeds, or fails. */ void put(const InfoHash& key, - std::shared_ptr<Value>, + Sp<Value>, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent = false); void put(const InfoHash& key, - const std::shared_ptr<Value>& v, + const Sp<Value>& v, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) @@ -222,12 +222,12 @@ public: /** * Get data currently being put at the given hash. */ - std::vector<std::shared_ptr<Value>> getPut(const InfoHash&); + std::vector<Sp<Value>> getPut(const InfoHash&); /** * Get data currently being put at the given hash with the given id. */ - std::shared_ptr<Value> getPut(const InfoHash&, const Value::Id&); + Sp<Value> getPut(const InfoHash&, const Value::Id&); /** * Stop any put/announce operation at the given location, @@ -390,7 +390,7 @@ private: size_t total_store_size {0}; size_t max_store_size {DEFAULT_STORAGE_LIMIT}; - using SearchMap = std::map<InfoHash, std::shared_ptr<Search>>; + using SearchMap = std::map<InfoHash, Sp<Search>>; SearchMap searches4 {}; SearchMap searches6 {}; uint16_t search_id {0}; @@ -402,8 +402,8 @@ private: // timing Scheduler scheduler; - std::shared_ptr<Scheduler::Job> nextNodesConfirmation {}; - std::shared_ptr<Scheduler::Job> nextStorageMaintenance {}; + Sp<Scheduler::Job> nextNodesConfirmation {}; + Sp<Scheduler::Job> nextStorageMaintenance {}; time_point mybucket_grow_time {time_point::min()}, mybucket6_grow_time {time_point::min()}; net::NetworkEngine network_engine; @@ -421,8 +421,8 @@ private: void reportedAddr(const SockAddr&); // Storage - void storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t tid, Query&& = {}); - bool storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, const SockAddr* sa = nullptr); + void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {}); + bool storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr* sa = nullptr); void expireStore(); void expireStorage(InfoHash h); void expireStore(decltype(store)::iterator); @@ -465,10 +465,10 @@ private: void dumpBucket(const Bucket& b, std::ostream& out) const; // Nodes - void onNewNode(const std::shared_ptr<Node>& node, int confirm); - std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af); - const std::shared_ptr<Node> findNode(const InfoHash& id, sa_family_t af) const; - bool trySearchInsert(const std::shared_ptr<Node>& node); + void onNewNode(const Sp<Node>& node, int confirm); + Sp<Node> findNode(const InfoHash& id, sa_family_t af); + const Sp<Node> findNode(const InfoHash& id, sa_family_t af) const; + bool trySearchInsert(const Sp<Node>& node); // Searches @@ -479,10 +479,10 @@ private: * Low-level method that will perform a search on the DHT for the specified * infohash (id), using the specified IP version (IPv4 or IPv6). */ - std::shared_ptr<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, Query q = {}); + Sp<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, Query q = {}); - void announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false); - size_t listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f = Value::AllFilter(), const std::shared_ptr<Query>& q = {}); + void announce(const InfoHash& id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false); + size_t listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f = Value::AllFilter(), const Sp<Query>& q = {}); /** * Refill the search with good nodes if possible. @@ -508,7 +508,7 @@ private: void searchNodeGetDone(const net::Request& status, net::NetworkEngine::RequestAnswer&& answer, std::weak_ptr<Search> ws, - std::shared_ptr<Query> query); + Sp<Query> query); /** * Generic function to execute when a 'get' request expires. @@ -519,7 +519,7 @@ private: * @param ws A weak pointer to the search concerned by the request. * @param query The query sent to the node. */ - void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, std::shared_ptr<Query> query); + void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, Sp<Query> query); /** * This method recovers sends individual request for values per id. @@ -528,12 +528,12 @@ private: * @param query The initial query passed through the API. * @param n The node to which send the requests. */ - void paginate(std::weak_ptr<Search> ws, std::shared_ptr<Query> query, SearchNode* n); + void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n); /** * If update is true, this method will also send message to synced but non-updated search nodes. */ - SearchNode* searchSendGetValues(std::shared_ptr<Search> sr, SearchNode *n = nullptr, bool update = true); + SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode *n = nullptr, bool update = true); /** * Forwards an 'announce' request for a list of nodes to the network engine. @@ -541,7 +541,7 @@ private: * @param sr The search for which we want to announce a value. * @param announce The 'announce' structure. */ - void searchSendAnnounceValue(const std::shared_ptr<Search>& sr); + void searchSendAnnounceValue(const Sp<Search>& sr); /** * Main process of a Search's operations. This function will demand the @@ -550,54 +550,54 @@ private: * * @param sr The search to execute its operations. */ - void searchStep(std::shared_ptr<Search> sr); + void searchStep(Sp<Search> sr); void dumpSearch(const Search& sr, std::ostream& out) const; bool neighbourhoodMaintenance(RoutingTable&); void processMessage(const uint8_t *buf, size_t buflen, const SockAddr&); - void onError(std::shared_ptr<net::Request> node, net::DhtProtocolException e); + void onError(Sp<net::Request> node, net::DhtProtocolException e); /* when our address is reported by a distant peer. */ void onReportedAddr(const InfoHash& id, const SockAddr&); /* when we receive a ping request */ - net::NetworkEngine::RequestAnswer onPing(std::shared_ptr<Node> node); + net::NetworkEngine::RequestAnswer onPing(Sp<Node> node); /* when we receive a "find node" request */ - net::NetworkEngine::RequestAnswer onFindNode(std::shared_ptr<Node> node, const InfoHash& hash, want_t want); - void onFindNodeDone(const std::shared_ptr<Node>& status, + net::NetworkEngine::RequestAnswer onFindNode(Sp<Node> node, const InfoHash& hash, want_t want); + void onFindNodeDone(const Sp<Node>& status, net::NetworkEngine::RequestAnswer& a, - std::shared_ptr<Search> sr); + Sp<Search> sr); /* when we receive a "get values" request */ - net::NetworkEngine::RequestAnswer onGetValues(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onGetValues(Sp<Node> node, const InfoHash& hash, want_t want, const Query& q); - void onGetValuesDone(const std::shared_ptr<Node>& status, + void onGetValuesDone(const Sp<Node>& status, net::NetworkEngine::RequestAnswer& a, - std::shared_ptr<Search>& sr, - const std::shared_ptr<Query>& orig_query); + Sp<Search>& sr, + const Sp<Query>& orig_query); /* when we receive a listen request */ - net::NetworkEngine::RequestAnswer onListen(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query); - void onListenDone(const std::shared_ptr<Node>& status, + void onListenDone(const Sp<Node>& status, net::NetworkEngine::RequestAnswer& a, - std::shared_ptr<Search>& sr); + Sp<Search>& sr); /* when we receive an announce request */ - net::NetworkEngine::RequestAnswer onAnnounce(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onAnnounce(Sp<Node> node, const InfoHash& hash, const Blob& token, - const std::vector<std::shared_ptr<Value>>& v, + const std::vector<Sp<Value>>& v, const time_point& created); - net::NetworkEngine::RequestAnswer onRefresh(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid); - void onAnnounceDone(const std::shared_ptr<Node>& status, + void onAnnounceDone(const Sp<Node>& status, net::NetworkEngine::RequestAnswer& a, - std::shared_ptr<Search>& sr); + Sp<Search>& sr); }; } diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index bf54b629f34c79d467050b27fa2aec12348ef458..fb64499e4c95a31784cac65740dc844581ef4357 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -112,10 +112,10 @@ public: struct RequestAnswer { Blob ntoken {}; Value::Id vid {}; - std::vector<std::shared_ptr<Value>> values {}; - std::vector<std::shared_ptr<FieldValueIndex>> fields {}; - std::vector<std::shared_ptr<Node>> nodes4 {}; - std::vector<std::shared_ptr<Node>> nodes6 {}; + std::vector<Sp<Value>> values {}; + std::vector<Sp<FieldValueIndex>> fields {}; + std::vector<Sp<Node>> nodes4 {}; + std::vector<Sp<Node>> nodes6 {}; RequestAnswer() {} RequestAnswer(ParsedMessage&& msg); }; @@ -123,102 +123,90 @@ public: private: /** - * @brief when we receive an error message. - * - * @param node (type: std::shared_ptr<Request>) the associated request for - * which we got an error; + * Called when we receive an error message. */ - std::function<void(std::shared_ptr<Request>, DhtProtocolException)> onError; + std::function<void(Sp<Request>, DhtProtocolException)> onError; + /** - * @brief when a new node happens. - * * Called for every packets received for handling new nodes contacting us. * - * @param id (type: InfoHash) id of the node. - * @param saddr (type: sockaddr*) sockaddr* pointer containing address ip information. - * @param saddr_len (type: socklen_t) lenght of the sockaddr struct. - * @param confirm (type: int) 1 if the node sent a message, 2 if it sent us a reply. + * @param node: the node + * @param confirm: 1 if the node sent a message, 2 if it sent us a reply. */ - std::function<void(const std::shared_ptr<Node>&, int)> onNewNode; + std::function<void(const Sp<Node>&, int)> onNewNode; /** - * @brief when an addres is reported from a distant node. + * Called when an addres is reported from a requested node. * - * @param id (type: InfoHash) id of the node. - * @param saddr (type: sockaddr*) sockaddr* pointer containing address ip information. + * @param h: id * @param saddr_len (type: socklen_t) lenght of the sockaddr struct. */ std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr; /** - * @brief on ping request callback. + * Called on ping reception. * - * @param node (type: std::shared_ptr<Node>) the requesting node. + * @param node (type: Sp<Node>) the requesting node. */ - std::function<RequestAnswer(std::shared_ptr<Node>)> onPing {}; + std::function<RequestAnswer(Sp<Node>)> onPing {}; /** - * @brief on find node request callback. + * Called on find node request. * - * @param node (type: std::shared_ptr<Node>) the requesting node. - * @param vhash (type: InfoHash) hash of the value of interest. + * @param node (type: Sp<Node>) the requesting node. + * @param h (type: InfoHash) hash of the value of interest. * @param want (type: want_t) states if nodes sent in the response are ipv4 * or ipv6. */ - std::function<RequestAnswer(std::shared_ptr<Node>, - const InfoHash&, - want_t)> onFindNode {}; + std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {}; /** - * @brief on "get values" request callback. + * Called on "get values" request. * - * @param node (type: std::shared_ptr<Node>) the requesting node. - * @param vhash (type: InfoHash) hash of the value of interest. + * @param node (type: Sp<Node>) the requesting node. + * @param h (type: InfoHash) hash of the value of interest. * @param want (type: want_t) states if nodes sent in the response are ipv4 * or ipv6. */ - std::function<RequestAnswer(std::shared_ptr<Node>, - const InfoHash&, - want_t, - const Query&)> onGetValues {}; + std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {}; /** - * @brief on listen request callback. + * Called on listen request. * - * @param node (type: std::shared_ptr<Node>) the requesting node. - * @param vhash (type: InfoHash) hash of the value of interest. + * @param node (type: Sp<Node>) the requesting node. + * @param h (type: InfoHash) hash of the value of interest. * @param token (type: Blob) security token. * @param rid (type: uint16_t) request id. */ - std::function<RequestAnswer(std::shared_ptr<Node>, + std::function<RequestAnswer(Sp<Node>, const InfoHash&, const Blob&, uint32_t, const Query&)> onListen {}; /** - * @brief on announce request callback. + * Called on announce request. * - * @param node (type: std::shared_ptr<Node>) the requesting node. - * @param vhash (type: InfoHash) hash of the value of interest. + * @param node (type: Sp<Node>) the requesting node. + * @param h (type: InfoHash) hash of the value of interest. * @param token (type: Blob) security token. - * @param values (type: std::vector<std::shared_ptr<Value>>) values to store. + * @param values (type: std::vector<Sp<Value>>) values to store. * @param created (type: time_point) time when the value was created. */ - std::function<RequestAnswer(std::shared_ptr<Node>, + std::function<RequestAnswer(Sp<Node>, const InfoHash&, const Blob&, - const std::vector<std::shared_ptr<Value>>&, + const std::vector<Sp<Value>>&, const time_point&)> onAnnounce {}; /** - * @brief on refresh request callback. + * Called on refresh request. * - * @param node (type: std::shared_ptr<Node>) the requesting node. - * @param vhash (type: InfoHash) hash of the value of interest. + * @param node (type: Sp<Node>) the requesting node. + * @param h (type: InfoHash) hash of the value of interest. * @param token (type: Blob) security token. * @param vid (type: Value::id) the value id. */ - std::function<RequestAnswer(std::shared_ptr<Node>, + std::function<RequestAnswer(Sp<Node>, const InfoHash&, const Blob&, const Value::Id&)> onRefresh {}; public: - using SocketCb = std::function<void(const std::shared_ptr<Node>&, RequestAnswer&&)>; + using SocketCb = std::function<void(const Sp<Node>&, RequestAnswer&&)>; using RequestCb = std::function<void(const Request&, RequestAnswer&&)>; using RequestExpiredCb = std::function<void(const Request&, bool)>; @@ -251,9 +239,9 @@ public: * @param nodes6 The ipv6 closest nodes. * @param values The values to send. */ - void tellListener(std::shared_ptr<Node> n, uint32_t socket_id, const InfoHash& hash, want_t want, const Blob& ntoken, - std::vector<std::shared_ptr<Node>>&& nodes, std::vector<std::shared_ptr<Node>>&& nodes6, - std::vector<std::shared_ptr<Value>>&& values, const Query& q); + void tellListener(Sp<Node> n, uint32_t socket_id, const InfoHash& hash, want_t want, const Blob& ntoken, + std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6, + std::vector<Sp<Value>>&& values, const Query& q); bool isRunning(sa_family_t af) const; inline want_t want () const { return dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; } @@ -262,7 +250,7 @@ public: * Cancel a request. Setting req->cancelled = true is not enough in the case * a request is "persistent". */ - void cancelRequest(std::shared_ptr<Request>& req); + void cancelRequest(Sp<Request>& req); void connectivityChanged(sa_family_t); @@ -279,8 +267,8 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> - sendPing(std::shared_ptr<Node> n, RequestCb&& on_done, RequestExpiredCb&& on_expired); + Sp<Request> + sendPing(Sp<Node> n, RequestCb&& on_done, RequestExpiredCb&& on_expired); /** * Send a "ping" request to a given node. * @@ -291,7 +279,7 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> + Sp<Request> sendPing(const sockaddr* sa, socklen_t salen, RequestCb&& on_done, RequestExpiredCb&& on_expired) { return sendPing(std::make_shared<Node>(zeroes, sa, salen), std::forward<RequestCb>(on_done), @@ -309,11 +297,11 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> sendFindNode(std::shared_ptr<Node> n, - const InfoHash& hash, - want_t want, - RequestCb&& on_done, - RequestExpiredCb&& on_expired); + Sp<Request> sendFindNode(Sp<Node> n, + const InfoHash& hash, + want_t want, + RequestCb&& on_done, + RequestExpiredCb&& on_expired); /** * Send a "get" request to a given node. * @@ -328,12 +316,12 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> sendGetValues(std::shared_ptr<Node> n, - const InfoHash& hash, - const Query& query, - want_t want, - RequestCb&& on_done, - RequestExpiredCb&& on_expired); + Sp<Request> sendGetValues(Sp<Node> n, + const InfoHash& hash, + const Query& query, + want_t want, + RequestCb&& on_done, + RequestExpiredCb&& on_expired); /** * Send a "listen" request to a given node. * @@ -357,14 +345,14 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> sendListen(std::shared_ptr<Node> n, - const InfoHash& hash, - const Query& query, - const Blob& token, - std::shared_ptr<Request> previous, - RequestCb&& on_done, - RequestExpiredCb&& on_expired, - SocketCb&& socket_cb); + Sp<Request> sendListen(Sp<Node> n, + const InfoHash& hash, + const Query& query, + const Blob& token, + Sp<Request> previous, + RequestCb&& on_done, + RequestExpiredCb&& on_expired, + SocketCb&& socket_cb); /** * Send a "announce" request to a given node. * @@ -378,13 +366,13 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> sendAnnounceValue(std::shared_ptr<Node> n, - const InfoHash& hash, - const std::shared_ptr<Value>& v, - time_point created, - const Blob& token, - RequestCb&& on_done, - RequestExpiredCb&& on_expired); + Sp<Request> sendAnnounceValue(Sp<Node> n, + const InfoHash& hash, + const Sp<Value>& v, + time_point created, + const Blob& token, + RequestCb&& on_done, + RequestExpiredCb&& on_expired); /** * Send a "refresh" request to a given node. Asks a node to keep the * associated value Value.type.expiration more minutes in its storage. @@ -398,12 +386,12 @@ public: * * @return the request with information concerning its success. */ - std::shared_ptr<Request> sendRefreshValue(std::shared_ptr<Node> n, - const InfoHash& hash, - const Value::Id& vid, - const Blob& token, - RequestCb&& on_done, - RequestExpiredCb&& on_expired); + Sp<Request> sendRefreshValue(Sp<Node> n, + const InfoHash& hash, + const Value::Id& vid, + const Blob& token, + RequestCb&& on_done, + RequestExpiredCb&& on_expired); /** * Opens a socket on which a node will be able allowed to write for further * additionnal updates following the response to a previous request. @@ -413,14 +401,14 @@ public: * * @return the socket. */ - std::shared_ptr<Socket> openSocket(const std::shared_ptr<Node>& node, TransPrefix tp, SocketCb&& cb); + Sp<Socket> openSocket(const Sp<Node>& node, TransPrefix tp, SocketCb&& cb); /** * Closes a socket so that no further data will be red on that socket. * * @param socket The socket to close. */ - void closeSocket(std::shared_ptr<Socket> socket); + void closeSocket(Sp<Socket> socket); /** * Parses a message and calls appropriate callbacks. @@ -433,7 +421,7 @@ public: */ void processMessage(const uint8_t *buf, size_t buflen, const SockAddr& addr); - std::shared_ptr<Node> insertNode(const InfoHash& myid, const SockAddr& addr) { + Sp<Node> insertNode(const InfoHash& myid, const SockAddr& addr) { auto n = cache.getNode(myid, addr, scheduler.time(), 0); onNewNode(n, 0); return n; @@ -446,9 +434,9 @@ public: return stats; } - void blacklistNode(const std::shared_ptr<Node>& n); + void blacklistNode(const Sp<Node>& n); - std::vector<std::shared_ptr<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) { + std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) { return cache.getCachedNodes(id, sa_f, count); } @@ -488,13 +476,13 @@ private: static bool isMartian(const SockAddr& addr); bool isNodeBlacklisted(const SockAddr& addr) const; - void requestStep(std::shared_ptr<Request> req); + void requestStep(Sp<Request> req); /** * Sends a request to a node. Request::MAX_ATTEMPT_COUNT attempts will * be made before the request expires. */ - void sendRequest(std::shared_ptr<Request>& request); + void sendRequest(Sp<Request>& request); /** * Generates a new request id, skipping the invalid id. @@ -520,7 +508,7 @@ private: int send(const char *buf, size_t len, int flags, const SockAddr& addr); void sendValueParts(TransId tid, const std::vector<Blob>& svals, const SockAddr& addr); - std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<std::shared_ptr<Value>>&); + std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<Sp<Value>>&); void maintainRxBuffer(const TransId& tid); /************* @@ -533,16 +521,16 @@ private: TransId tid, const Blob& nodes, const Blob& nodes6, - const std::vector<std::shared_ptr<Value>>& st, + const std::vector<Sp<Value>>& st, const Query& query, const Blob& token); - Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<std::shared_ptr<Node>>& nodes); + Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes); std::pair<Blob, Blob> bufferNodes(sa_family_t af, const InfoHash& id, want_t want, - std::vector<std::shared_ptr<Node>>& nodes, - std::vector<std::shared_ptr<Node>>& nodes6); + std::vector<Sp<Node>>& nodes, + std::vector<Sp<Node>>& nodes6); /* answer to a listen request */ void sendListenConfirmation(const SockAddr& addr, TransId tid); /* answer to put request */ @@ -601,9 +589,9 @@ private: // requests handling uint16_t transaction_id {1}; - std::map<TransId, std::shared_ptr<Request>> requests {}; + std::map<TransId, Sp<Request>> requests {}; std::map<TransId, PartialMessage> partial_messages; - std::map<TransId, std::shared_ptr<Socket>> opened_sockets {}; + std::map<TransId, Sp<Socket>> opened_sockets {}; MessageStats in_stats {}, out_stats {}; std::set<SockAddr> blacklist {}; diff --git a/include/opendht/routing_table.h b/include/opendht/routing_table.h index 99f2cfe6bd6b8123b68aceaf044c4e7aad189645..b672e6ef1a2ea3d9a952e2c0388997dc879c2e2b 100644 --- a/include/opendht/routing_table.h +++ b/include/opendht/routing_table.h @@ -32,12 +32,12 @@ struct Bucket { : af(af), first(f), time(t), cached() {} sa_family_t af {0}; InfoHash first {}; - time_point time {time_point::min()}; /* time of last reply in this bucket */ - std::list<std::shared_ptr<Node>> nodes {}; - std::shared_ptr<Node> cached; /* the address of a likely candidate */ + time_point time {time_point::min()}; /* time of last reply in this bucket */ + std::list<Sp<Node>> nodes {}; + Sp<Node> cached; /* the address of a likely candidate */ /** Return a random node in a bucket. */ - std::shared_ptr<Node> randomNode(); + Sp<Node> randomNode(); }; class RoutingTable : public std::list<Bucket> { @@ -46,7 +46,7 @@ public: InfoHash middle(const RoutingTable::const_iterator&) const; - std::vector<std::shared_ptr<Node>> findClosestNodes(const InfoHash id, time_point now, size_t count = TARGET_NODES) const; + std::vector<Sp<Node>> findClosestNodes(const InfoHash id, time_point now, size_t count = TARGET_NODES) const; RoutingTable::iterator findBucket(const InfoHash& id); RoutingTable::const_iterator findBucket(const InfoHash& id) const; diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index f75ed6db2ae5f939a98cbbd8356d24f13dc6b8e7..22e6da149199c717b682bb6062890a59a64177da 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -52,7 +52,7 @@ public: * * @return pointer to the newly scheduled job. */ - std::shared_ptr<Scheduler::Job> add(time_point t, std::function<void()>&& job_func) { + Sp<Scheduler::Job> add(time_point t, std::function<void()>&& job_func) { auto job = std::make_shared<Job>(std::move(job_func)); if (t != time_point::max()) timers.emplace(std::move(t), job); @@ -67,7 +67,7 @@ public: * * @return pointer to the newly scheduled job. */ - void edit(std::shared_ptr<Scheduler::Job>& job, time_point t) { + void edit(Sp<Scheduler::Job>& job, time_point t) { if (not job) { DHT_LOG.ERR("editing an empty job"); return; @@ -118,7 +118,7 @@ public: private: time_point now {clock::now()}; - std::multimap<time_point, std::shared_ptr<Job>> timers {}; /* the jobs ordered by time */ + std::multimap<time_point, Sp<Job>> timers {}; /* the jobs ordered by time */ const Logger& DHT_LOG; }; diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index fc073f999d4fcd359a5a76b6a070d9bc1634ce1c..08aea2e06f0fc4226e6ac877cf0822efe6aab6cb 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -105,7 +105,7 @@ public: /** * Will take ownership of the value, sign it using our private key and put it in the DHT. */ - void putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallback callback, bool permanent = false); + void putSigned(const InfoHash& hash, Sp<Value> val, DoneCallback callback, bool permanent = false); void putSigned(const InfoHash& hash, Value&& v, DoneCallback callback, bool permanent = false) { putSigned(hash, std::make_shared<Value>(std::move(v)), callback, permanent); } @@ -115,7 +115,7 @@ public: * and put it in the DHT. * The operation will be immediate if the recipient' public key is known (otherwise it will be retrived first). */ - void putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr<Value> val, DoneCallback callback, bool permanent = false); + void putEncrypted(const InfoHash& hash, const InfoHash& to, Sp<Value> val, DoneCallback callback, bool permanent = false); void putEncrypted(const InfoHash& hash, const InfoHash& to, Value&& v, DoneCallback callback, bool permanent = false) { putEncrypted(hash, to, std::make_shared<Value>(std::move(v)), callback, permanent); } @@ -129,14 +129,14 @@ public: Value decrypt(const Value& v); - void findCertificate(const InfoHash& node, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb); - void findPublicKey(const InfoHash& node, std::function<void(const std::shared_ptr<const crypto::PublicKey>)> cb); + void findCertificate(const InfoHash& node, std::function<void(const Sp<crypto::Certificate>)> cb); + void findPublicKey(const InfoHash& node, std::function<void(const Sp<const crypto::PublicKey>)> cb); - const std::shared_ptr<crypto::Certificate> registerCertificate(const InfoHash& node, const Blob& cert); - void registerCertificate(std::shared_ptr<crypto::Certificate>& cert); + const Sp<crypto::Certificate> registerCertificate(const InfoHash& node, const Blob& cert); + void registerCertificate(Sp<crypto::Certificate>& cert); - const std::shared_ptr<crypto::Certificate> getCertificate(const InfoHash& node) const; - const std::shared_ptr<const crypto::PublicKey> getPublicKey(const InfoHash& node) const; + const Sp<crypto::Certificate> getCertificate(const InfoHash& node) const; + const Sp<const crypto::PublicKey> getPublicKey(const InfoHash& node) const; /** * Allows to set a custom callback called by the library to find a locally-stored certificate. @@ -154,15 +154,15 @@ private: GetCallback getCallbackFilter(GetCallback, Value::Filter&&); - std::shared_ptr<crypto::PrivateKey> key_ {}; - std::shared_ptr<crypto::Certificate> certificate_ {}; + Sp<crypto::PrivateKey> key_ {}; + Sp<crypto::Certificate> certificate_ {}; // method to query the local certificate store CertificateStoreQuery localQueryMethod_ {}; // our certificate cache - std::map<InfoHash, std::shared_ptr<crypto::Certificate>> nodesCertificates_ {}; - std::map<InfoHash, std::shared_ptr<const crypto::PublicKey>> nodesPubKeys_ {}; + std::map<InfoHash, Sp<crypto::Certificate>> nodesCertificates_ {}; + std::map<InfoHash, Sp<const crypto::PublicKey>> nodesPubKeys_ {}; std::uniform_int_distribution<Value::Id> rand_id {}; }; @@ -170,7 +170,7 @@ private: const ValueType CERTIFICATE_TYPE = { 8, "Certificate", std::chrono::hours(24 * 7), // A certificate can only be stored at its public key ID. - [](InfoHash id, std::shared_ptr<Value>& v, InfoHash, const sockaddr*, socklen_t) { + [](InfoHash id, Sp<Value>& v, InfoHash, const sockaddr*, socklen_t) { try { crypto::Certificate crt(v->data); // TODO check certificate signature @@ -178,7 +178,7 @@ const ValueType CERTIFICATE_TYPE = { } catch (const std::exception& e) {} return false; }, - [](InfoHash, const std::shared_ptr<Value>& o, std::shared_ptr<Value>& n, InfoHash, const sockaddr*, socklen_t) { + [](InfoHash, const Sp<Value>& o, Sp<Value>& n, InfoHash, const sockaddr*, socklen_t) { try { return crypto::Certificate(o->data).getPublicKey().getId() == crypto::Certificate(n->data).getPublicKey().getId(); } catch (const std::exception& e) {} diff --git a/include/opendht/utils.h b/include/opendht/utils.h index 32c06e7bbc9e66adb0bda0ae557b50ccb55497f2..1e74c3dd0d03aacfd47e6408b83b7d671c0358c8 100644 --- a/include/opendht/utils.h +++ b/include/opendht/utils.h @@ -37,6 +37,10 @@ namespace dht { using NetId = uint32_t; using want_t = int_fast8_t; +// shortcut for std::shared_ptr +template<class T> +using Sp = std::shared_ptr<T>; + template <typename Key, typename Item, typename Condition> void erase_if(std::map<Key, Item>& map, const Condition& condition) { @@ -53,7 +57,6 @@ class OPENDHT_PUBLIC DhtException : public std::runtime_error { std::runtime_error("DhtException occurred: " + str) {} }; - // Time related definitions and utility functions using clock = std::chrono::steady_clock; diff --git a/src/dht.cpp b/src/dht.cpp index a16c6b1a90f73a316385e45e20a0fd0aaac2f0f6..6b93871ca681469f3baa9e0ad08608e0a7ed7bb7 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -136,19 +136,19 @@ private: }; struct Dht::ValueStorage { - std::shared_ptr<Value> data {}; + Sp<Value> data {}; time_point created {}; time_point expiration {}; StorageBucket* store_bucket {nullptr}; ValueStorage() {} - ValueStorage(const std::shared_ptr<Value>& v, time_point t, time_point e) + ValueStorage(const Sp<Value>& v, time_point t, time_point e) : data(v), created(t), expiration(e) {} }; struct Dht::Storage { time_point maintenance_time {}; - std::map<std::shared_ptr<Node>, std::map<size_t, Listener>> listeners; + std::map<Sp<Node>, std::map<size_t, Listener>> listeners; std::map<size_t, LocalListener> local_listeners {}; size_t listener_token {1}; @@ -198,14 +198,14 @@ struct Dht::Storage { const std::vector<ValueStorage>& getValues() const { return values; } - std::shared_ptr<Value> getById(Value::Id vid) const { + Sp<Value> getById(Value::Id vid) const { for (auto& v : values) if (v.data->id == vid) return v.data; return {}; } - std::vector<std::shared_ptr<Value>> get(Value::Filter f = {}) const { - std::vector<std::shared_ptr<Value>> newvals {}; + std::vector<Sp<Value>> get(Value::Filter f = {}) const { + std::vector<Sp<Value>> newvals {}; if (not f) newvals.reserve(values.size()); for (auto& v : values) { if (not f || f(*v.data)) @@ -223,7 +223,7 @@ struct Dht::Storage { * change_value_num: change of value number (0 or 1) */ std::pair<ValueStorage*, StoreDiff> - store(const InfoHash& id, const std::shared_ptr<Value>&, time_point created, time_point expiration, StorageBucket*); + store(const InfoHash& id, const Sp<Value>&, time_point created, time_point expiration, StorageBucket*); /** * Refreshes the time point of the value's lifetime begining. @@ -260,7 +260,7 @@ private: struct Dht::Get { time_point start; Value::Filter filter; - std::shared_ptr<Query> query; + Sp<Query> query; QueryCallback query_cb; GetCallback get_cb; DoneCallback done_cb; @@ -271,7 +271,7 @@ struct Dht::Get { */ struct Dht::Announce { bool permanent; - std::shared_ptr<Value> value; + Sp<Value> value; time_point created; DoneCallback callback; }; @@ -280,7 +280,7 @@ struct Dht::Announce { * A single "listen" operation data */ struct Dht::LocalListener { - std::shared_ptr<Query> query; + Sp<Query> query; Value::Filter filter; GetCallback get_cb; }; @@ -291,19 +291,19 @@ struct Dht::SearchNode { * request is the request returned by the network engine and the time_point * is the next time at which the value must be refreshed. */ - using AnnounceStatus = std::map<Value::Id, std::pair<std::shared_ptr<net::Request>, time_point>>; + using AnnounceStatus = std::map<Value::Id, std::pair<Sp<net::Request>, time_point>>; /** * Foreach Query, we keep track of the request returned by the network * engine when we sent the "get". */ - using SyncStatus = std::map<std::shared_ptr<Query>, std::shared_ptr<net::Request>>; + using SyncStatus = std::map<Sp<Query>, Sp<net::Request>>; - std::shared_ptr<Node> node {}; /* the node info */ + Sp<Node> node {}; /* the node info */ /* queries sent for finding out values hosted by the node */ - std::shared_ptr<Query> probe_query {}; + Sp<Query> probe_query {}; /* queries substituting formal 'get' requests */ - std::map<std::shared_ptr<Query>, std::vector<std::shared_ptr<Query>>> pagination_queries {}; + std::map<Sp<Query>, std::vector<Sp<Query>>> pagination_queries {}; SyncStatus getStatus {}; /* get/sync status */ SyncStatus listenStatus {}; /* listen status */ @@ -315,7 +315,7 @@ struct Dht::SearchNode { node is a new candidate for inclusion. */ SearchNode() : node() {} - SearchNode(const std::shared_ptr<Node>& node) : node(node) {} + SearchNode(const Sp<Node>& node) : node(node) {} /** * Can we use this node to listen/announce now ? @@ -343,7 +343,7 @@ struct Dht::SearchNode { * * @return true if we can send get, else false. */ - bool canGet(time_point now, time_point update, std::shared_ptr<Query> q = {}) const { + bool canGet(time_point now, time_point update, Sp<Query> q = {}) const { if (node->isExpired()) return false; @@ -374,12 +374,12 @@ struct Dht::SearchNode { * * @return true if pagination process has started, else false. */ - bool hasStartedPagination(const std::shared_ptr<Query>& q) const { + bool hasStartedPagination(const Sp<Query>& q) const { const auto& pqs = pagination_queries.find(q); if (pqs == pagination_queries.cend() or pqs->second.empty()) return false; return std::find_if(pqs->second.cbegin(), pqs->second.cend(), - [this](const std::shared_ptr<Query>& query) { + [this](const Sp<Query>& query) { const auto& req = getStatus.find(query); return req != getStatus.cend() and req->second; }) != pqs->second.cend(); @@ -401,7 +401,7 @@ struct Dht::SearchNode { if (hasStartedPagination(get.query)) { const auto& pqs = pagination_queries.find(get.query); auto paginationPending = std::find_if(pqs->second.cbegin(), pqs->second.cend(), - [this](const std::shared_ptr<Query>& query) { + [this](const Sp<Query>& query) { const auto& req = getStatus.find(query); return req != getStatus.cend() and req->second and req->second->pending(); }) != pqs->second.cend(); @@ -456,7 +456,7 @@ struct Dht::SearchNode { } return ls != listenStatus.end(); } - bool isListening(time_point now, const std::shared_ptr<Query>& q) const { + bool isListening(time_point now, const Sp<Query>& q) const { const auto& ls = listenStatus.find(q); if (ls == listenStatus.end()) return false; @@ -488,7 +488,7 @@ struct Dht::SearchNode { * Assumng the node is synced, should the "listen" request with Query q be * sent to this node now ? */ - time_point getListenTime(const std::shared_ptr<Query>& q) const { + time_point getListenTime(const Sp<Query>& q) const { auto listen_status = listenStatus.find(q); if (listen_status == listenStatus.end() or not listen_status->second) return time_point::min(); @@ -515,7 +515,7 @@ struct Dht::Search { uint16_t tid; time_point refill_time {time_point::min()}; time_point step_time {time_point::min()}; /* the time of the last search step */ - std::shared_ptr<Scheduler::Job> nextSearchStep {}; + Sp<Scheduler::Job> nextSearchStep {}; bool expired {false}; /* no node, or all nodes expired */ bool done {false}; /* search is over, cached for later */ @@ -545,9 +545,9 @@ struct Dht::Search { /** * @returns true if the node was not present and added to the search */ - bool insertNode(const std::shared_ptr<Node>& n, time_point now, const Blob& token={}); + bool insertNode(const Sp<Node>& n, time_point now, const Blob& token={}); - SearchNode* getNode(const std::shared_ptr<Node>& n) { + SearchNode* getNode(const Sp<Node>& n) { auto srn = std::find_if(nodes.begin(), nodes.end(), [&](SearchNode& sn) { return n == sn.node; }); @@ -574,7 +574,7 @@ struct Dht::Search { * * @param query The query identifying a 'get' request. */ - time_point getLastGetTime(std::shared_ptr<Query> query = {}) const; + time_point getLastGetTime(Sp<Query> query = {}) const; /** * Is this get operation done ? @@ -751,7 +751,7 @@ struct Dht::Search { announce.erase(announced, announce.end()); } - std::vector<std::shared_ptr<Node>> getNodes() const; + std::vector<Sp<Node>> getNodes() const; void clear() { announce.clear(); @@ -793,7 +793,7 @@ Dht::shutdown(ShutdownCallback cb) // Last store maintenance scheduler.syncTime(); auto remaining = std::make_shared<int>(0); - auto str_donecb = [=](bool, const std::vector<std::shared_ptr<Node>>&) { + auto str_donecb = [=](bool, const std::vector<Sp<Node>>&) { --*remaining; DHT_LOG.w("shuting down node: %u ops remaining", *remaining); if (!*remaining && cb) { cb(); } @@ -813,7 +813,7 @@ bool Dht::isRunning(sa_family_t af) const { return network_engine.isRunning(af); } /* Every bucket contains an unordered list of nodes. */ -std::shared_ptr<Node> +Sp<Node> Dht::findNode(const InfoHash& id, sa_family_t af) { Bucket* b = findBucket(id, af); @@ -824,7 +824,7 @@ Dht::findNode(const InfoHash& id, sa_family_t af) return {}; } -const std::shared_ptr<Node> +const Sp<Node> Dht::findNode(const InfoHash& id, sa_family_t af) const { const Bucket* b = findBucket(id, af); @@ -863,7 +863,7 @@ Dht::getPublicAddress(sa_family_t family) } bool -Dht::trySearchInsert(const std::shared_ptr<Node>& node) +Dht::trySearchInsert(const Sp<Node>& node) { const auto& now = scheduler.time(); if (not node) return false; @@ -913,7 +913,7 @@ Dht::reportedAddr(const SockAddr& addr) /* We just learnt about a node, not necessarily a new one. Confirm is 1 if the node sent a message, 2 if it sent us a reply. */ void -Dht::onNewNode(const std::shared_ptr<Node>& node, int confirm) +Dht::onNewNode(const Sp<Node>& node, int confirm) { auto& list = buckets(node->getFamily()); auto b = list.findBucket(node->id); @@ -992,7 +992,7 @@ Dht::expireBuckets(RoutingTable& list) { for (auto& b : list) { bool changed = false; - b.nodes.remove_if([this,&changed](const std::shared_ptr<Node>& n) { + b.nodes.remove_if([this,&changed](const Sp<Node>& n) { if (n->isExpired()) { changed = true; return true; @@ -1008,7 +1008,7 @@ Dht::expireBuckets(RoutingTable& list) target. We just got a new candidate, insert it at the right spot or discard it. */ bool -Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, const Blob& token) +Dht::Search::insertNode(const Sp<Node>& snode, time_point now, const Blob& token) { auto& node = *snode; const auto& nid = node.id; @@ -1095,10 +1095,10 @@ Dht::Search::insertNode(const std::shared_ptr<Node>& snode, time_point now, cons return new_search_node; } -std::vector<std::shared_ptr<Node>> +std::vector<Sp<Node>> Dht::Search::getNodes() const { - std::vector<std::shared_ptr<Node>> ret {}; + std::vector<Sp<Node>> ret {}; ret.reserve(nodes.size()); for (const auto& sn : nodes) ret.emplace_back(sn.node); @@ -1109,7 +1109,7 @@ void Dht::expireSearches() { auto t = scheduler.time() - SEARCH_EXPIRE_TIME; - auto expired = [&](std::pair<const InfoHash, std::shared_ptr<Search>>& srp) { + auto expired = [&](std::pair<const InfoHash, Sp<Search>>& srp) { auto& sr = *srp.second; auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t; if (b) { @@ -1126,7 +1126,7 @@ void Dht::searchNodeGetDone(const net::Request& req, net::NetworkEngine::RequestAnswer&& answer, std::weak_ptr<Search> ws, - std::shared_ptr<Query> query) + Sp<Query> query) { const auto& now = scheduler.time(); if (auto sr = ws.lock()) { @@ -1151,7 +1151,7 @@ void Dht::searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, - std::shared_ptr<Query> query) + Sp<Query> query) { if (auto sr = ws.lock()) { if (auto srn = sr->getNode(status.node)) { @@ -1163,7 +1163,7 @@ Dht::searchNodeGetExpired(const net::Request& status, } } -void Dht::paginate(std::weak_ptr<Search> ws, std::shared_ptr<Query> query, SearchNode* n) { +void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) { auto sr = ws.lock(); if (not sr) return; auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {}); @@ -1218,7 +1218,7 @@ void Dht::paginate(std::weak_ptr<Search> ws, std::shared_ptr<Query> query, Searc } Dht::SearchNode* -Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update) +Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update) { if (sr->done or sr->currentlySolicitedNodeCount() >= MAX_REQUESTED_SEARCH_NODES) return nullptr; @@ -1284,7 +1284,7 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update return nullptr; } -void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { +void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { if (sr->announce.empty()) return; unsigned i = 0; @@ -1325,7 +1325,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { uint16_t seq_no = 0; try { const auto& f = std::find_if(answer.fields.cbegin(), answer.fields.cend(), - [&a](const std::shared_ptr<FieldValueIndex>& i){ + [&a](const Sp<FieldValueIndex>& i){ return i->index.at(Value::Field::Id).getInt() == a.value->id; }); if (f != answer.fields.cend() and *f) { @@ -1391,7 +1391,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { /* When a search is in progress, we periodically call search_step to send further requests. */ void -Dht::searchStep(std::shared_ptr<Search> sr) +Dht::searchStep(Sp<Search> sr) { if (not sr or sr->expired or sr->done) return; @@ -1471,7 +1471,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) sn->listenStatus.erase(query); } }, - [this,ws,query](const std::shared_ptr<Node>& node, net::NetworkEngine::RequestAnswer&& answer) mutable + [this,ws,query](const Sp<Node>& node, net::NetworkEngine::RequestAnswer&& answer) mutable { /* on new values */ if (auto sr = ws.lock()) { onGetValuesDone(node, answer, sr, query); @@ -1537,7 +1537,7 @@ Dht::Search::isSynced(time_point now) const } time_point -Dht::Search::getLastGetTime(std::shared_ptr<Query> q) const +Dht::Search::getLastGetTime(Sp<Query> q) const { time_point last = time_point::min(); for (const auto& g : callbacks) @@ -1727,7 +1727,7 @@ unsigned Dht::refill(Dht::Search& sr) { /* Start a search. */ -std::shared_ptr<Dht::Search> +Sp<Dht::Search> Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, Query q) { if (!isRunning(af)) { @@ -1739,7 +1739,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q auto& srs = searches(af); const auto& srp = srs.find(id); - std::shared_ptr<Search> sr {}; + Sp<Search> sr {}; if (srp != srs.end()) { sr = srp->second; @@ -1795,7 +1795,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q void Dht::announce(const InfoHash& id, sa_family_t af, - std::shared_ptr<Value> value, + Sp<Value> value, DoneCallback callback, time_point created, bool permanent) @@ -1855,7 +1855,7 @@ Dht::announce(const InfoHash& id, } size_t -Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f, const std::shared_ptr<Query>& q) +Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f, const Sp<Query>& q) { const auto& now = scheduler.time(); if (!isRunning(af)) @@ -1865,7 +1865,7 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter //DHT_LOG_WARN("listenTo %s", id.toString().c_str()); auto& srs = searches(af); auto srp = srs.find(id); - std::shared_ptr<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second; + Sp<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second; if (!sr) throw DhtException("Can't create search"); DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); @@ -1882,11 +1882,11 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where scheduler.syncTime(); Query q {{}, where}; - auto vals = std::make_shared<std::map<Value::Id, std::shared_ptr<Value>>>(); + auto vals = std::make_shared<std::map<Value::Id, Sp<Value>>>(); auto token = ++listener_token; - auto gcb = [=](const std::vector<std::shared_ptr<Value>>& values) { - std::vector<std::shared_ptr<Value>> newvals; + auto gcb = [=](const std::vector<Sp<Value>>& values) { + std::vector<Sp<Value>> newvals; for (const auto& v : values) { auto it = vals->find(v->id); if (it == vals->cend() || !(*it->second == *v)) @@ -1914,7 +1914,7 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where st = store.emplace(id, Storage(scheduler.time())).first; if (st != store.end()) { if (not st->second.empty()) { - std::vector<std::shared_ptr<Value>> newvals = st->second.get(filter); + std::vector<Sp<Value>> newvals = st->second.get(filter); if (not newvals.empty()) { if (!cb(newvals)) return 0; @@ -1953,14 +1953,14 @@ Dht::cancelListen(const InfoHash& id, size_t token) if (st != store.end() && tokenlocal) st->second.local_listeners.erase(tokenlocal); - auto searches_cancel_listen = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { + auto searches_cancel_listen = [&](std::map<InfoHash, Sp<Search>> srs) { for (auto& sp : srs) { auto& s = sp.second; if (s->id != id) continue; auto af_token = s->af == AF_INET ? std::get<1>(it->second) : std::get<2>(it->second); if (af_token == 0) continue; - std::shared_ptr<Query> query; + Sp<Query> query; const auto& ll = s->listeners.find(af_token); if (ll != s->listeners.cend()) query = ll->second.query; @@ -1987,7 +1987,7 @@ Dht::cancelListen(const InfoHash& id, size_t token) } void -Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, time_point created, bool permanent) +Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point created, bool permanent) { scheduler.syncTime(); @@ -2003,20 +2003,20 @@ Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, auto done = std::make_shared<bool>(false); auto done4 = std::make_shared<bool>(false); auto done6 = std::make_shared<bool>(false); - auto donecb = [=](const std::vector<std::shared_ptr<Node>>& nodes) { + auto donecb = [=](const std::vector<Sp<Node>>& nodes) { // Callback as soon as the value is announced on one of the available networks if (callback && !*done && (*done4 && *done6)) { callback(*ok, nodes); *done = true; } }; - announce(id, AF_INET, val, [=](bool ok4, const std::vector<std::shared_ptr<Node>>& nodes) { + announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) { DHT_LOG.d(id, "Announce done IPv4 %d", ok4); *done4 = true; *ok |= ok4; donecb(nodes); }, created, permanent); - announce(id, AF_INET6, val, [=](bool ok6, const std::vector<std::shared_ptr<Node>>& nodes) { + announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) { DHT_LOG.d(id, "Announce done IPv6 %d", ok6); *done6 = true; *ok |= ok6; @@ -2033,12 +2033,12 @@ struct OpStatus { Status status; Status status4; Status status6; - std::vector<std::shared_ptr<T>> values; - std::vector<std::shared_ptr<Node>> nodes; + std::vector<Sp<T>> values; + std::vector<Sp<Node>> nodes; }; template <typename T> -void doneCallbackWrapper(DoneCallback dcb, const std::vector<std::shared_ptr<Node>>& nodes, std::shared_ptr<OpStatus<T>> op) { +void doneCallbackWrapper(DoneCallback dcb, const std::vector<Sp<Node>>& nodes, Sp<OpStatus<T>> op) { if (op->status.done) return; op->nodes.insert(op->nodes.end(), nodes.begin(), nodes.end()); @@ -2053,9 +2053,9 @@ void doneCallbackWrapper(DoneCallback dcb, const std::vector<std::shared_ptr<Nod template <typename T, typename Cb> bool callbackWrapper(Cb get_cb, DoneCallback done_cb, - const std::vector<std::shared_ptr<T>>& values, - std::function<std::vector<std::shared_ptr<T>>(const std::vector<std::shared_ptr<T>>&)> add_values, - std::shared_ptr<OpStatus<T>> op) + const std::vector<Sp<T>>& values, + std::function<std::vector<Sp<T>>(const std::vector<Sp<T>>&)> add_values, + Sp<OpStatus<T>> op) { if (op->status.done) return false; @@ -2077,10 +2077,10 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt auto op = std::make_shared<OpStatus<Value>>(); auto f = filter.chain(q.where.getFilter()); - auto add_values = [op,f](const std::vector<std::shared_ptr<Value>>& values) { - std::vector<std::shared_ptr<Value>> newvals {}; + auto add_values = [op,f](const std::vector<Sp<Value>>& values) { + std::vector<Sp<Value>> newvals {}; for (const auto& v : values) { - auto it = std::find_if(op->values.cbegin(), op->values.cend(), [&](const std::shared_ptr<Value>& sv) { + auto it = std::find_if(op->values.cbegin(), op->values.cend(), [&](const Sp<Value>& sv) { return sv == v or *sv == *v; }); if (it == op->values.cend()) { @@ -2095,13 +2095,13 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt /* Try to answer this search locally. */ gcb(getLocal(id, f)); - Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) { + Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) { //DHT_LOG_WARN("DHT done IPv4"); op->status4.done = true; op->status4.ok = ok; doneCallbackWrapper(donecb, nodes, op); }, f, q); - Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) { + Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) { //DHT_LOG_WARN("DHT done IPv6"); op->status6.done = true; op->status6.ok = ok; @@ -2116,16 +2116,16 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer auto f = q.where.getFilter(); auto values = getLocal(id, f); - auto add_fields = [=](const std::vector<std::shared_ptr<FieldValueIndex>>& fields) { - std::vector<std::shared_ptr<FieldValueIndex>> newvals {}; + auto add_fields = [=](const std::vector<Sp<FieldValueIndex>>& fields) { + std::vector<Sp<FieldValueIndex>> newvals {}; for (const auto& f : fields) { auto it = std::find_if(op->values.cbegin(), op->values.cend(), - [&](const std::shared_ptr<FieldValueIndex>& sf) { + [&](const Sp<FieldValueIndex>& sf) { return sf == f or f->containedIn(*sf); }); if (it == op->values.cend()) { auto lesser = std::find_if(op->values.begin(), op->values.end(), - [&](const std::shared_ptr<FieldValueIndex>& sf) { + [&](const Sp<FieldValueIndex>& sf) { return sf->containedIn(*f); }); if (lesser != op->values.end()) @@ -2135,8 +2135,8 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer } return newvals; }; - std::vector<std::shared_ptr<FieldValueIndex>> local_fields(values.size()); - std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const std::shared_ptr<Value>& v) { + std::vector<Sp<FieldValueIndex>> local_fields(values.size()); + std::transform(values.begin(), values.end(), local_fields.begin(), [&q](const Sp<Value>& v) { return std::make_shared<FieldValueIndex>(*v, q.select); }); auto qcb = std::bind(callbackWrapper<FieldValueIndex, QueryCallback>, cb, done_cb, _1, add_fields, op); @@ -2144,13 +2144,13 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer /* Try to answer this search locally. */ qcb(local_fields); - Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) { + Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) { //DHT_LOG_WARN("DHT done IPv4"); op->status4.done = true; op->status4.ok = ok; doneCallbackWrapper(done_cb, nodes, op); }, f, q); - Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<std::shared_ptr<Node>>& nodes) { + Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) { //DHT_LOG_WARN("DHT done IPv6"); op->status6.done = true; op->status6.ok = ok; @@ -2158,7 +2158,7 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer }, f, q); } -std::vector<std::shared_ptr<Value>> +std::vector<Sp<Value>> Dht::getLocal(const InfoHash& id, Value::Filter f) const { auto s = store.find(id); @@ -2166,7 +2166,7 @@ Dht::getLocal(const InfoHash& id, Value::Filter f) const return s->second.get(f); } -std::shared_ptr<Value> +Sp<Value> Dht::getLocalById(const InfoHash& id, Value::Id vid) const { auto s = store.find(id); @@ -2175,11 +2175,11 @@ Dht::getLocalById(const InfoHash& id, Value::Id vid) const return {}; } -std::vector<std::shared_ptr<Value>> +std::vector<Sp<Value>> Dht::getPut(const InfoHash& id) { - std::vector<std::shared_ptr<Value>> ret; - auto find_values = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { + std::vector<Sp<Value>> ret; + auto find_values = [&](std::map<InfoHash, Sp<Search>> srs) { auto srp = srs.find(id); if (srp == srs.end()) return; @@ -2193,19 +2193,19 @@ Dht::getPut(const InfoHash& id) return ret; } -std::shared_ptr<Value> +Sp<Value> Dht::getPut(const InfoHash& id, const Value::Id& vid) { - auto find_value = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { + auto find_value = [&](std::map<InfoHash, Sp<Search>> srs) { auto srp = srs.find(id); if (srp == srs.end()) - return std::shared_ptr<Value> {}; + return Sp<Value> {}; auto& search = srp->second; for (auto& a : search->announce) { if (a.value->id == vid) return a.value; } - return std::shared_ptr<Value> {}; + return Sp<Value> {}; }; auto v4 = find_value(searches4); if (v4) return v4; @@ -2218,7 +2218,7 @@ bool Dht::cancelPut(const InfoHash& id, const Value::Id& vid) { bool canceled {false}; - auto sr_cancel_put = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { + auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>> srs) { auto srp = srs.find(id); if (srp == srs.end()) return; @@ -2247,9 +2247,9 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) DHT_LOG.d(id, "[store %s] changed", id.toString().c_str()); if (not st.local_listeners.empty()) { DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); - std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> cbs; + std::vector<std::pair<GetCallback, std::vector<Sp<Value>>>> cbs; for (const auto& l : st.local_listeners) { - std::vector<std::shared_ptr<Value>> vals; + std::vector<Sp<Value>> vals; if (not l.second.filter or l.second.filter(*v.data)) vals.push_back(v.data); if (not vals.empty()) { @@ -2273,7 +2273,7 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending update", id.toString().c_str(), node_listeners.first->toString().c_str()); - std::vector<std::shared_ptr<Value>> vals {}; + std::vector<Sp<Value>> vals {}; vals.push_back(v.data); Blob ntoken = makeToken((const sockaddr*)&node_listeners.first->addr.first, false); network_engine.tellListener(node_listeners.first, l.second.sid, id, 0, ntoken, {}, {}, @@ -2283,7 +2283,7 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) } bool -Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, const SockAddr* sa) +Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr* sa) { const auto& now = scheduler.time(); created = std::min(created, now); @@ -2321,7 +2321,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ } std::pair<Dht::ValueStorage*, Dht::Storage::StoreDiff> -Dht::Storage::store(const InfoHash& id, const std::shared_ptr<Value>& value, time_point created, time_point expiration, StorageBucket* sb) +Dht::Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, time_point expiration, StorageBucket* sb) { auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { return vr.data == value || vr.data->id == value->id; @@ -2388,7 +2388,7 @@ Dht::Storage::clear() } void -Dht::storageAddListener(const InfoHash& id, const std::shared_ptr<Node>& node, size_t socket_id, Query&& query) +Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_id, Query&& query) { const auto& now = scheduler.time(); auto st = store.find(id); @@ -2748,7 +2748,7 @@ Dht::dumpTables() const for (const auto& b : buckets6) dumpBucket(b, out); - auto dump_searches = [&](std::map<InfoHash, std::shared_ptr<Search>> srs) { + auto dump_searches = [&](std::map<InfoHash, Sp<Search>> srs) { for (auto& srp : srs) dumpSearch(*srp.second, out); }; @@ -3293,7 +3293,7 @@ Dht::pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb) } void -Dht::onError(std::shared_ptr<net::Request> req, net::DhtProtocolException e) { +Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) { if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) { DHT_LOG.e(req->node->id, "[node %s] token flush", req->node->toString().c_str()); req->node->authError(); @@ -3324,13 +3324,13 @@ Dht::onReportedAddr(const InfoHash& id, const SockAddr& addr) } net::NetworkEngine::RequestAnswer -Dht::onPing(std::shared_ptr<Node>) +Dht::onPing(Sp<Node>) { return {}; } net::NetworkEngine::RequestAnswer -Dht::onFindNode(std::shared_ptr<Node> node, const InfoHash& target, want_t want) +Dht::onFindNode(Sp<Node> node, const InfoHash& target, want_t want) { const auto& now = scheduler.time(); net::NetworkEngine::RequestAnswer answer; @@ -3343,7 +3343,7 @@ Dht::onFindNode(std::shared_ptr<Node> node, const InfoHash& target, want_t want) } net::NetworkEngine::RequestAnswer -Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const Query& query) +Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query) { if (hash == zeroes) { DHT_LOG.w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str()); @@ -3367,10 +3367,10 @@ Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const return answer; } -void Dht::onGetValuesDone(const std::shared_ptr<Node>& node, +void Dht::onGetValuesDone(const Sp<Node>& node, net::NetworkEngine::RequestAnswer& a, - std::shared_ptr<Search>& sr, - const std::shared_ptr<Query>& orig_query) + Sp<Search>& sr, + const Sp<Query>& orig_query) { if (not sr) { DHT_LOG.w("[search unknown] got reply to 'get'. Ignoring."); @@ -3395,14 +3395,14 @@ void Dht::onGetValuesDone(const std::shared_ptr<Node>& node, if (not a.fields.empty()) { get.query_cb(a.fields); } else if (not a.values.empty()) { - std::vector<std::shared_ptr<FieldValueIndex>> fields; + std::vector<Sp<FieldValueIndex>> fields; fields.reserve(a.values.size()); for (const auto& v : a.values) fields.emplace_back(std::make_shared<FieldValueIndex>(*v, orig_query ? orig_query->select : Select {})); get.query_cb(fields); } } else if (get.get_cb) { /* in case of a vanilla get request */ - std::vector<std::shared_ptr<Value>> tmp; + std::vector<Sp<Value>> tmp; for (const auto& v : a.values) if (not get.filter or get.filter(*v)) tmp.emplace_back(v); @@ -3412,11 +3412,11 @@ void Dht::onGetValuesDone(const std::shared_ptr<Node>& node, } /* callbacks for local search listeners */ - std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> tmp_lists; + std::vector<std::pair<GetCallback, std::vector<Sp<Value>>>> tmp_lists; for (auto& l : sr->listeners) { if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query))) continue; - std::vector<std::shared_ptr<Value>> tmp; + std::vector<Sp<Value>> tmp; for (const auto& v : a.values) if (not l.second.filter or l.second.filter(*v)) tmp.emplace_back(v); @@ -3440,7 +3440,7 @@ void Dht::onGetValuesDone(const std::shared_ptr<Node>& node, } net::NetworkEngine::RequestAnswer -Dht::onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query) +Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query) { if (hash == zeroes) { DHT_LOG.w(node->id, "[node %s] listen with no info_hash", node->toString().c_str()); @@ -3459,9 +3459,9 @@ Dht::onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& toke } void -Dht::onListenDone(const std::shared_ptr<Node>& node, +Dht::onListenDone(const Sp<Node>& node, net::NetworkEngine::RequestAnswer& answer, - std::shared_ptr<Search>& sr) + Sp<Search>& sr) { DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation", sr->id.toString().c_str(), node->toString().c_str(), answer.values.size()); @@ -3474,10 +3474,10 @@ Dht::onListenDone(const std::shared_ptr<Node>& node, } net::NetworkEngine::RequestAnswer -Dht::onAnnounce(std::shared_ptr<Node> node, +Dht::onAnnounce(Sp<Node> node, const InfoHash& hash, const Blob& token, - const std::vector<std::shared_ptr<Value>>& values, + const std::vector<Sp<Value>>& values, const time_point& creation_date) { if (hash == zeroes) { @@ -3511,7 +3511,7 @@ Dht::onAnnounce(std::shared_ptr<Node> node, }; } auto lv = getLocalById(hash, v->id); - std::shared_ptr<Value> vc = v; + Sp<Value> vc = v; if (lv) { if (*lv == *vc) { DHT_LOG.w(hash, node->id, "[store %s] nothing to do for %s", hash.toString().c_str(), lv->toString().c_str()); @@ -3542,7 +3542,7 @@ Dht::onAnnounce(std::shared_ptr<Node> node, } net::NetworkEngine::RequestAnswer -Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid) +Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid) { using namespace net; @@ -3564,7 +3564,7 @@ Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& tok } void -Dht::onAnnounceDone(const std::shared_ptr<Node>& node, net::NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr) +Dht::onAnnounceDone(const Sp<Node>& node, net::NetworkEngine::RequestAnswer& answer, Sp<Search>& sr) { DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got reply to put!", sr->id.toString().c_str(), node->toString().c_str()); diff --git a/src/network_engine.cpp b/src/network_engine.cpp index bac57905bcb4e3a887c00a86efe656bb86cbea0b..aeaf8925703687d51aa197377315860a707e5e3d 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -92,11 +92,11 @@ struct ParsedMessage { time_point created { time_point::max() }; /* IPv4 nodes in response to a 'find' request */ Blob nodes4_raw, nodes6_raw; - std::vector<std::shared_ptr<Node>> nodes4, nodes6; + std::vector<Sp<Node>> nodes4, nodes6; /* values to store or retreive request */ - std::vector<std::shared_ptr<Value>> values; + std::vector<Sp<Value>> values; /* index for fields values */ - std::vector<std::shared_ptr<FieldValueIndex>> fields; + std::vector<Sp<FieldValueIndex>> fields; /** When part of the message header: {index -> (total size, {})} * When part of partial value data: {index -> (offset, part_data)} */ std::map<unsigned, std::pair<unsigned, Blob>> value_parts; @@ -123,7 +123,7 @@ struct NetworkEngine::PartialMessage { }; std::vector<Blob> -serializeValues(const std::vector<std::shared_ptr<Value>>& st) +serializeValues(const std::vector<Sp<Value>>& st) { std::vector<Blob> svals; svals.reserve(st.size()); @@ -159,9 +159,9 @@ NetworkEngine::~NetworkEngine() { } void -NetworkEngine::tellListener(std::shared_ptr<Node> node, uint32_t socket_id, const InfoHash& hash, want_t want, - const Blob& ntoken, std::vector<std::shared_ptr<Node>>&& nodes, - std::vector<std::shared_ptr<Node>>&& nodes6, std::vector<std::shared_ptr<Value>>&& values, +NetworkEngine::tellListener(Sp<Node> node, uint32_t socket_id, const InfoHash& hash, want_t want, + const Blob& ntoken, std::vector<Sp<Node>>&& nodes, + std::vector<Sp<Node>>&& nodes6, std::vector<Sp<Value>>&& values, const Query& query) { auto nnodes = bufferNodes(node->getFamily(), hash, want, nodes, nodes6); @@ -187,8 +187,8 @@ NetworkEngine::isRunning(sa_family_t af) const } } -std::shared_ptr<Socket> -NetworkEngine::openSocket(const std::shared_ptr<Node>& node, TransPrefix tp, SocketCb&& cb) +Sp<Socket> +NetworkEngine::openSocket(const Sp<Node>& node, TransPrefix tp, SocketCb&& cb) { auto tid = TransId {tp, getNewTid()}; auto s = opened_sockets.emplace(tid, std::make_shared<Socket>(node, tid, cb)); @@ -198,14 +198,14 @@ NetworkEngine::openSocket(const std::shared_ptr<Node>& node, TransPrefix tp, Soc } void -NetworkEngine::closeSocket(std::shared_ptr<Socket> socket) +NetworkEngine::closeSocket(Sp<Socket> socket) { if (socket) opened_sockets.erase(socket->id); } void -NetworkEngine::cancelRequest(std::shared_ptr<Request>& req) +NetworkEngine::cancelRequest(Sp<Request>& req) { if (req) { req->cancel(); @@ -229,7 +229,7 @@ NetworkEngine::connectivityChanged(sa_family_t af) } void -NetworkEngine::requestStep(std::shared_ptr<Request> sreq) +NetworkEngine::requestStep(Sp<Request> sreq) { auto& req = *sreq; if (not req.pending()) { @@ -266,7 +266,7 @@ NetworkEngine::requestStep(std::shared_ptr<Request> sreq) * be made before the request expires. */ void -NetworkEngine::sendRequest(std::shared_ptr<Request>& request) +NetworkEngine::sendRequest(Sp<Request>& request) { request->start = scheduler.time(); auto e = requests.emplace(request->tid, request); @@ -341,7 +341,7 @@ NetworkEngine::isMartian(const SockAddr& addr) /* The internal blacklist is an LRU cache of nodes that have sent incorrect messages. */ void -NetworkEngine::blacklistNode(const std::shared_ptr<Node>& n) +NetworkEngine::blacklistNode(const Sp<Node>& n) { n->setExpired(); for (auto rit = requests.begin(); rit != requests.end();) { @@ -631,8 +631,8 @@ NetworkEngine::send(const char *buf, size_t len, int flags, const SockAddr& addr return sendto(s, buf, len, flags, (const sockaddr*)&addr.first, addr.second); } -std::shared_ptr<Request> -NetworkEngine::sendPing(std::shared_ptr<Node> node, RequestCb&& on_done, RequestExpiredCb&& on_expired) { +Sp<Request> +NetworkEngine::sendPing(Sp<Node> node, RequestCb&& on_done, RequestExpiredCb&& on_expired) { auto tid = TransId {TransPrefix::PING, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -651,7 +651,7 @@ NetworkEngine::sendPing(std::shared_ptr<Node> node, RequestCb&& on_done, Request } Blob b {buffer.data(), buffer.data() + buffer.size()}; - std::shared_ptr<Request> req(new Request {tid, node, std::move(b), + Sp<Request> req(new Request {tid, node, std::move(b), [=](const Request& req_status, ParsedMessage&&) { DHT_LOG.d(req_status.node->id, "[node %s] got pong !", req_status.node->toString().c_str()); if (on_done) { @@ -690,8 +690,8 @@ NetworkEngine::sendPong(const SockAddr& addr, TransId tid) { send(buffer.data(), buffer.size(), 0, addr); } -std::shared_ptr<Request> -NetworkEngine::sendFindNode(std::shared_ptr<Node> n, const InfoHash& target, want_t want, +Sp<Request> +NetworkEngine::sendFindNode(Sp<Node> n, const InfoHash& target, want_t want, RequestCb&& on_done, RequestExpiredCb&& on_expired) { auto tid = TransId {TransPrefix::FIND_NODE, getNewTid()}; msgpack::sbuffer buffer; @@ -718,7 +718,7 @@ NetworkEngine::sendFindNode(std::shared_ptr<Node> n, const InfoHash& target, wan } Blob b {buffer.data(), buffer.data() + buffer.size()}; - std::shared_ptr<Request> req(new Request {tid, n, std::move(b), + Sp<Request> req(new Request {tid, n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (on_done) { on_done(req_status, {std::forward<ParsedMessage>(msg)}); @@ -736,8 +736,8 @@ NetworkEngine::sendFindNode(std::shared_ptr<Node> n, const InfoHash& target, wan } -std::shared_ptr<Request> -NetworkEngine::sendGetValues(std::shared_ptr<Node> n, const InfoHash& info_hash, const Query& query, want_t want, +Sp<Request> +NetworkEngine::sendGetValues(Sp<Node> n, const InfoHash& info_hash, const Query& query, want_t want, RequestCb&& on_done, RequestExpiredCb&& on_expired) { auto tid = TransId {TransPrefix::GET_VALUES, getNewTid()}; msgpack::sbuffer buffer; @@ -767,7 +767,7 @@ NetworkEngine::sendGetValues(std::shared_ptr<Node> n, const InfoHash& info_hash, } Blob b {buffer.data(), buffer.data() + buffer.size()}; - std::shared_ptr<Request> req(new Request {tid, n, std::move(b), + Sp<Request> req(new Request {tid, n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (on_done) { on_done(req_status, {std::forward<ParsedMessage>(msg)}); @@ -828,7 +828,7 @@ NetworkEngine::deserializeNodes(ParsedMessage& msg) { } std::vector<Blob> -NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<std::shared_ptr<Value>>& st) +NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<Sp<Value>>& st) { auto svals = serializeValues(st); size_t total_size = 0; @@ -883,7 +883,7 @@ NetworkEngine::sendValueParts(TransId tid, const std::vector<Blob>& svals, const void NetworkEngine::sendNodesValues(const SockAddr& addr, TransId tid, const Blob& nodes, const Blob& nodes6, - const std::vector<std::shared_ptr<Value>>& st, const Query& query, const Blob& token) + const std::vector<Sp<Value>>& st, const Query& query, const Blob& token) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -940,9 +940,9 @@ NetworkEngine::sendNodesValues(const SockAddr& addr, TransId tid, const Blob& no } Blob -NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, std::vector<std::shared_ptr<Node>>& nodes) +NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes) { - std::sort(nodes.begin(), nodes.end(), [&](const std::shared_ptr<Node>& a, const std::shared_ptr<Node>& b){ + std::sort(nodes.begin(), nodes.end(), [&](const Sp<Node>& a, const Sp<Node>& b){ return id.xorCmp(a->id, b->id) < 0; }); size_t nnode = std::min<size_t>(SEND_NODES, nodes.size()); @@ -975,7 +975,7 @@ NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, std::vector<std:: std::pair<Blob, Blob> NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, want_t want, - std::vector<std::shared_ptr<Node>>& nodes4, std::vector<std::shared_ptr<Node>>& nodes6) + std::vector<Sp<Node>>& nodes4, std::vector<Sp<Node>>& nodes6) { if (want < 0) want = af == AF_INET ? WANT4 : WANT6; @@ -991,17 +991,17 @@ NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, want_t want, return {std::move(bnodes4), std::move(bnodes6)}; } -std::shared_ptr<Request> -NetworkEngine::sendListen(std::shared_ptr<Node> n, +Sp<Request> +NetworkEngine::sendListen(Sp<Node> n, const InfoHash& hash, const Query& query, const Blob& token, - std::shared_ptr<Request> previous, + Sp<Request> previous, RequestCb&& on_done, RequestExpiredCb&& on_expired, SocketCb&& socket_cb) { - std::shared_ptr<Socket> socket; + Sp<Socket> socket; auto tid = TransId { TransPrefix::LISTEN, previous ? previous->tid.getTid() : getNewTid() }; if (previous and previous->node == n) { socket = previous->socket; @@ -1046,7 +1046,7 @@ NetworkEngine::sendListen(std::shared_ptr<Node> n, } Blob b {buffer.data(), buffer.data() + buffer.size()}; - std::shared_ptr<Request> req(new Request {tid, n, std::move(b), + Sp<Request> req(new Request {tid, n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (on_done) on_done(req_status, {std::forward<ParsedMessage>(msg)}); @@ -1083,10 +1083,10 @@ NetworkEngine::sendListenConfirmation(const SockAddr& addr, TransId tid) { send(buffer.data(), buffer.size(), 0, addr); } -std::shared_ptr<Request> -NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, +Sp<Request> +NetworkEngine::sendAnnounceValue(Sp<Node> n, const InfoHash& infohash, - const std::shared_ptr<Value>& value, + const Sp<Value>& value, time_point created, const Blob& token, RequestCb&& on_done, @@ -1117,7 +1117,7 @@ NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, } Blob b {buffer.data(), buffer.data() + buffer.size()}; - std::shared_ptr<Request> req(new Request {tid, n, std::move(b), + Sp<Request> req(new Request {tid, n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { DHT_LOG.d(infohash, "Unknown search or announce!"); @@ -1142,8 +1142,8 @@ NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, return req; } -std::shared_ptr<Request> -NetworkEngine::sendRefreshValue(std::shared_ptr<Node> n, +Sp<Request> +NetworkEngine::sendRefreshValue(Sp<Node> n, const InfoHash& infohash, const Value::Id& vid, const Blob& token, @@ -1171,7 +1171,7 @@ NetworkEngine::sendRefreshValue(std::shared_ptr<Node> n, } Blob b {buffer.data(), buffer.data() + buffer.size()}; - std::shared_ptr<Request> req(new Request {tid, n, std::move(b), + Sp<Request> req(new Request {tid, n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { DHT_LOG.d(infohash, "Unknown search or announce!"); diff --git a/src/routing_table.cpp b/src/routing_table.cpp index 54b08897cdbe326c80d9c70fc35f6a529807a7b2..5c1444c1dd481be508571c93b461e3ae80fda8fd 100644 --- a/src/routing_table.cpp +++ b/src/routing_table.cpp @@ -12,7 +12,7 @@ static std::uniform_int_distribution<int> rand_byte{ 0, std::numeric_limits<uint static std::uniform_int_distribution<uint8_t> rand_byte; #endif -std::shared_ptr<Node> +Sp<Node> Bucket::randomNode() { if (nodes.empty()) @@ -77,10 +77,10 @@ RoutingTable::depth(const RoutingTable::const_iterator& it) const return std::max(bit1, bit2)+1; } -std::vector<std::shared_ptr<Node>> +std::vector<Sp<Node>> RoutingTable::findClosestNodes(const InfoHash id, time_point now, size_t count) const { - std::vector<std::shared_ptr<Node>> nodes {}; + std::vector<Sp<Node>> nodes {}; auto bucket = findBucket(id); if (bucket == end()) { return nodes; } @@ -91,7 +91,7 @@ RoutingTable::findClosestNodes(const InfoHash id, time_point now, size_t count) continue; auto here = std::find_if(nodes.begin(), nodes.end(), - [&id,&n](std::shared_ptr<Node> &node) { + [&id,&n](Sp<Node> &node) { return id.xorCmp(n->id, node->id) < 0; } ); @@ -162,7 +162,7 @@ RoutingTable::split(const RoutingTable::iterator& b) insert(std::next(b), Bucket {b->af, new_id, b->time}); // Re-assign nodes - std::list<std::shared_ptr<Node>> nodes {}; + std::list<Sp<Node>> nodes {}; nodes.splice(nodes.begin(), b->nodes); while (!nodes.empty()) { auto n = nodes.begin(); diff --git a/src/securedht.cpp b/src/securedht.cpp index 88d568ef1030e7c057f10e133706b9228d29743a..d9bcfc5424d3a5209b3366941efa7dc1d63abab2 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -81,7 +81,7 @@ SecureDht::~SecureDht() ValueType SecureDht::secureType(ValueType&& type) { - type.storePolicy = [this,type](InfoHash id, std::shared_ptr<Value>& v, InfoHash nid, const sockaddr* a, socklen_t al) { + type.storePolicy = [this,type](InfoHash id, Sp<Value>& v, InfoHash nid, const sockaddr* a, socklen_t al) { if (v->isSigned()) { if (!v->owner or !v->owner->checkSignature(v->getToSign(), v->signature)) { DHT_LOG.WARN("Signature verification failed"); @@ -92,7 +92,7 @@ SecureDht::secureType(ValueType&& type) } return type.storePolicy(id, v, nid, a, al); }; - type.editPolicy = [this,type](InfoHash id, const std::shared_ptr<Value>& o, std::shared_ptr<Value>& n, InfoHash nid, const sockaddr* a, socklen_t al) { + type.editPolicy = [this,type](InfoHash id, const Sp<Value>& o, Sp<Value>& n, InfoHash nid, const sockaddr* a, socklen_t al) { if (!o->isSigned()) return type.editPolicy(id, o, n, nid, a, al); if (o->owner != n->owner) { @@ -118,7 +118,7 @@ SecureDht::secureType(ValueType&& type) return type; } -const std::shared_ptr<crypto::Certificate> +const Sp<crypto::Certificate> SecureDht::getCertificate(const InfoHash& node) const { if (node == getId()) @@ -130,7 +130,7 @@ SecureDht::getCertificate(const InfoHash& node) const return it->second; } -const std::shared_ptr<const crypto::PublicKey> +const Sp<const crypto::PublicKey> SecureDht::getPublicKey(const InfoHash& node) const { if (node == getId()) @@ -142,10 +142,10 @@ SecureDht::getPublicKey(const InfoHash& node) const return it->second; } -const std::shared_ptr<crypto::Certificate> +const Sp<crypto::Certificate> SecureDht::registerCertificate(const InfoHash& node, const Blob& data) { - std::shared_ptr<crypto::Certificate> crt; + Sp<crypto::Certificate> crt; try { crt = std::make_shared<crypto::Certificate>(data); } catch (const std::exception& e) { @@ -167,16 +167,16 @@ SecureDht::registerCertificate(const InfoHash& node, const Blob& data) } void -SecureDht::registerCertificate(std::shared_ptr<crypto::Certificate>& cert) +SecureDht::registerCertificate(Sp<crypto::Certificate>& cert) { if (cert) nodesCertificates_[cert->getId()] = cert; } void -SecureDht::findCertificate(const InfoHash& node, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb) +SecureDht::findCertificate(const InfoHash& node, std::function<void(const Sp<crypto::Certificate>)> cb) { - std::shared_ptr<crypto::Certificate> b = getCertificate(node); + Sp<crypto::Certificate> b = getCertificate(node); if (b && *b) { DHT_LOG.DEBUG("Using certificate from cache for %s", node.toString().c_str()); if (cb) @@ -195,7 +195,7 @@ SecureDht::findCertificate(const InfoHash& node, std::function<void(const std::s } auto found = std::make_shared<bool>(false); - Dht::get(node, [cb,node,found,this](const std::vector<std::shared_ptr<Value>>& vals) { + Dht::get(node, [cb,node,found,this](const std::vector<Sp<Value>>& vals) { if (*found) return false; for (const auto& v : vals) { @@ -215,7 +215,7 @@ SecureDht::findCertificate(const InfoHash& node, std::function<void(const std::s } void -SecureDht::findPublicKey(const InfoHash& node, std::function<void(const std::shared_ptr<const crypto::PublicKey>)> cb) +SecureDht::findPublicKey(const InfoHash& node, std::function<void(const Sp<const crypto::PublicKey>)> cb) { auto pk = getPublicKey(node); if (pk && *pk) { @@ -224,7 +224,7 @@ SecureDht::findPublicKey(const InfoHash& node, std::function<void(const std::sha cb(pk); return; } - findCertificate(node, [=](const std::shared_ptr<crypto::Certificate> crt) { + findCertificate(node, [=](const Sp<crypto::Certificate> crt) { if (crt && *crt) { auto pk = std::make_shared<crypto::PublicKey>(crt->getPublicKey()); nodesPubKeys_[pk->getId()] = pk; @@ -238,8 +238,8 @@ SecureDht::findPublicKey(const InfoHash& node, std::function<void(const std::sha GetCallback SecureDht::getCallbackFilter(GetCallback cb, Value::Filter&& filter) { - return [=](const std::vector<std::shared_ptr<Value>>& values) { - std::vector<std::shared_ptr<Value>> tmpvals {}; + return [=](const std::vector<Sp<Value>>& values) { + std::vector<Sp<Value>> tmpvals {}; for (const auto& v : values) { // Decrypt encrypted values if (v->isEncrypted()) { @@ -292,7 +292,7 @@ SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& } void -SecureDht::putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallback callback, bool permanent) +SecureDht::putSigned(const InfoHash& hash, Sp<Value> val, DoneCallback callback, bool permanent) { if (val->id == Value::INVALID_ID) { crypto::random_device rdev; @@ -308,7 +308,7 @@ SecureDht::putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallb // Check if data already exists on the dht get(hash, - [val,this] (const std::vector<std::shared_ptr<Value>>& vals) { + [val,this] (const std::vector<Sp<Value>>& vals) { DHT_LOG.DEBUG("Found online previous value being announced."); for (const auto& v : vals) { if (!v->isSigned()) @@ -329,9 +329,9 @@ SecureDht::putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallb } void -SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr<Value> val, DoneCallback callback, bool permanent) +SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, Sp<Value> val, DoneCallback callback, bool permanent) { - findPublicKey(to, [=](const std::shared_ptr<const crypto::PublicKey> pk) { + findPublicKey(to, [=](const Sp<const crypto::PublicKey> pk) { if(!pk || !*pk) { if (callback) callback(false, {});