diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 847eebf2e039e965cdae1c3a37446ed97c69abea..570360cea7ca722c94c5f10ea0e3979f9b263695 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -69,6 +69,8 @@ public: socklen_t sslen; }; + typedef int_fast8_t want_t; + Dht() {} /** @@ -169,6 +171,16 @@ public: */ bool cancelPut(const InfoHash&, const Value::Id&); + /** + * Listen for any changes involving a specified hash. + * The node will register to receive updates from relevent nodes when + * new values are added or removed. + * + * @return a token to cancel the listener later. + */ + size_t listen(const InfoHash&, GetCallback, Value::Filter = Value::AllFilter()); + bool cancelListen(const InfoHash&, size_t token); + /** * Get the list of good nodes for local storage saving purposes * The list is ordered to minimize the back-to-work delay. @@ -293,14 +305,14 @@ private: SearchNode() : ss() {} SearchNode(const InfoHash& id) : id(id), ss() {} - struct AnnounceStatus { + struct RequestStatus { time_t request_time; /* the time of the last unanswered announce request */ time_t reply_time; /* the time of the last announce confirmation */ }; - typedef std::map<Value::Id, AnnounceStatus> AnnounceStatusMap; + typedef std::map<Value::Id, RequestStatus> AnnounceStatusMap; /** - * Can we use this node to announce ? + * Can we use this node to listen/announce ? */ bool isSynced(time_t now) const { return /*pinged < 3 && replied &&*/ reply_time > now - 15 * 60; @@ -321,6 +333,9 @@ private: time_t request_time {0}; /* the time of the last unanswered request */ time_t reply_time {0}; /* the time of the last reply with a token */ unsigned pinged {0}; + + RequestStatus listenStatus {0, 0}; + Blob token {}; AnnounceStatusMap acked {}; /* announcement status for a given value id */ @@ -345,15 +360,22 @@ private: * - Announcing (Some announces not performed on all nodes) */ struct Search { - uint16_t tid; + InfoHash id {}; sa_family_t af; + + uint16_t tid; time_t step_time {0}; /* the time of the last search_step */ - InfoHash id {}; + + bool done {false}; + std::vector<SearchNode> nodes {SEARCH_NODES+1}; + std::vector<Announce> announce {}; + std::vector<std::pair<Value::Filter, GetCallback>> callbacks {}; DoneCallback done_callback {nullptr}; - bool done {false}; - std::vector<SearchNode> nodes {SEARCH_NODES+1}; + + std::map<size_t, std::pair<Value::Filter, GetCallback>> listeners {}; + size_t listener_token = 1; bool insertNode(const InfoHash& id, const sockaddr*, socklen_t, time_t now, bool confirmed=false, const Blob& token={}); void insertBucket(const Bucket&, time_t now); @@ -364,18 +386,16 @@ private: bool isSynced(time_t now) const; /** - * Are all values that are registred for announcement announced ? + * ret = 0 : no announce required. + * ret > 0 : (re-)announce required at time ret. */ - bool isAnnounced(const std::map<ValueType::Id, ValueType>& types, time_t now) const { - auto at = getAnnounceTime(types); - return at && at < now; - } + time_t getAnnounceTime(const std::map<ValueType::Id, ValueType>& types) const; /** - * ret = 0 : no announce required. + * ret = 0 : no listen required. * ret > 0 : (re-)announce required at time ret. */ - time_t getAnnounceTime(const std::map<ValueType::Id, ValueType>& types) const; + time_t getListenTime(time_t now) const; time_t getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_t now) const; }; @@ -388,9 +408,26 @@ private: ValueStorage(const std::shared_ptr<Value>& v, time_t t) : data(v), time(t) {} }; + /** + * Foreign nodes asking for updates about an InfoHash. + */ + struct Listener { + InfoHash id {}; + sockaddr_storage ss; + socklen_t sslen {}; + time_t time {}; + uint16_t tid; + + Listener() : ss(), sslen() {} + Listener(const InfoHash& id, const sockaddr *from, socklen_t fromlen, uint16_t tid) : id(id), sslen(fromlen), tid(tid) { + memcpy(&ss, from, fromlen); + } + }; + struct Storage { InfoHash id; std::vector<ValueStorage> values; + std::vector<Listener> listeners; }; enum class MessageType { @@ -399,7 +436,8 @@ private: Ping, FindNode, GetValues, - AnnounceValue + AnnounceValue, + Listen }; struct TransPrefix : public std::array<uint8_t, 2> { @@ -408,6 +446,7 @@ private: static const TransPrefix FIND_NODE; static const TransPrefix GET_VALUES; static const TransPrefix ANNOUNCE_VALUES; + static const TransPrefix LISTEN; }; /* Transaction-ids are 4-bytes long, with the first two bytes identifying @@ -446,18 +485,16 @@ private: Dht(const Dht&) = delete; Dht& operator=(const Dht&) = delete; + // socket descriptors int dht_socket {-1}; int dht_socket6 {-1}; - time_t search_time {0}; - time_t confirm_nodes_time {0}; - time_t rotate_secrets_time {0}; - InfoHash myid {}; static const uint8_t my_v[9]; std::array<uint8_t, 8> secret {{}}; std::array<uint8_t, 8> oldsecret {{}}; + // registred types std::map<ValueType::Id, ValueType> types; // the stuff @@ -466,6 +503,8 @@ private: std::vector<Storage> store {}; std::list<Search> searches {}; uint16_t search_id {0}; + std::map<size_t, std::pair<size_t, size_t>> listeners {}; + size_t listener_token {0}; sockaddr_storage blacklist[BLACKLISTED_MAX] {}; unsigned next_blacklisted = 0; @@ -473,8 +512,12 @@ private: struct timeval now {0, 0}; time_t mybucket_grow_time {0}, mybucket6_grow_time {0}; time_t expire_stuff_time {0}; + time_t search_time {0}; + time_t confirm_nodes_time {0}; + time_t rotate_secrets_time {0}; time_t rate_limit_time {0}; + // remaining requests for this second long unsigned rate_limit_tokens {MAX_REQUESTS_PER_SEC}; // Networking & packet handling @@ -483,7 +526,7 @@ private: int sendPong(const sockaddr*, socklen_t, TransId tid); int sendFindNode(const sockaddr*, socklen_t, TransId tid, - const InfoHash& target, int want, int confirm); + const InfoHash& target, want_t want, int confirm); int sendNodesValues(const sockaddr*, socklen_t, TransId tid, const uint8_t *nodes, unsigned nodes_len, @@ -491,14 +534,19 @@ private: Storage *st, const Blob& token); int sendClosestNodes(const sockaddr*, socklen_t, TransId tid, - const InfoHash& id, int want, const Blob& token={}, + const InfoHash& id, want_t want, const Blob& token={}, Storage *st=nullptr); int sendGetValues(const sockaddr*, socklen_t, TransId tid, - const InfoHash& infohash, int want, int confirm); + const InfoHash& infohash, want_t want, int confirm); + + int sendListen(const sockaddr*, socklen_t, TransId, + const InfoHash&, const Blob& token, int confirm); + + int sendListenConfirmation(const sockaddr*, socklen_t, TransId); - int sendAnnounceValue(const sockaddr*, socklen_t, TransId tid, - const InfoHash& infohas, const Value& data, + int sendAnnounceValue(const sockaddr*, socklen_t, TransId, + const InfoHash&, const Value&, const Blob& token, int confirm); int sendValueAnnounced(const sockaddr*, socklen_t, TransId, Value::Id); @@ -514,7 +562,7 @@ private: uint8_t *nodes_return, unsigned *nodes_len, uint8_t *nodes6_return, unsigned *nodes6_len, std::vector<std::shared_ptr<Value>>& values_return, - int *want_return, uint16_t& error_code); + want_t* want_return, uint16_t& error_code); void rotateSecrets(); @@ -527,8 +575,10 @@ private: return const_cast<Dht*>(this)->findStorage(id); } + void storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, uint16_t tid); ValueStorage* storageStore(const InfoHash& id, const std::shared_ptr<Value>& value); void expireStorage(); + void storageChanged(Storage& st); // Buckets Bucket* findBucket(const InfoHash& id, sa_family_t af) { @@ -575,6 +625,7 @@ private: */ Search* search(const InfoHash& id, sa_family_t af, GetCallback = nullptr, DoneCallback = nullptr, Value::Filter = Value::AllFilter()); void announce(const InfoHash& id, sa_family_t af, const std::shared_ptr<Value>& value, DoneCallback callback); + size_t listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f = Value::AllFilter()); std::list<Search>::iterator newSearch(); void bootstrapSearch(Search& sr); diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 09b883edc9c008d8de4fc9872418988755763417..afc11a5114db91838cb6e640af68510fcf99a4ed 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -62,6 +62,9 @@ public: void get(InfoHash hash, Dht::GetCallback vcb, Dht::DoneCallback dcb=nullptr, Value::Filter f = Value::AllFilter()); void get(const std::string& key, Dht::GetCallback vcb, Dht::DoneCallback dcb=nullptr, Value::Filter f = Value::AllFilter()); + void listen(InfoHash hash, Dht::GetCallback vcb, Value::Filter f = Value::AllFilter()); + void listen(const std::string& key, Dht::GetCallback vcb, Value::Filter f = Value::AllFilter()); + void put(InfoHash hash, Value&& value, Dht::DoneCallback cb=nullptr); void put(const std::string& key, Value&& value, Dht::DoneCallback cb=nullptr); @@ -148,6 +151,7 @@ private: // IPC temporary storage std::vector<std::tuple<InfoHash, Dht::GetCallback, Dht::DoneCallback, Value::Filter>> dht_gets {}; + std::vector<std::tuple<InfoHash, Dht::GetCallback, Value::Filter>> dht_listen {}; std::vector<std::tuple<InfoHash, Value, Dht::DoneCallback>> dht_puts {}; std::vector<std::tuple<InfoHash, Value, Dht::DoneCallback>> dht_sputs {}; std::vector<std::tuple<InfoHash, InfoHash, Value, Dht::DoneCallback>> dht_eputs {}; diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index b435a029d0a383bdf7bf6be6dcb2a457f69a408e..7ed8397699098e8f02e5027a1f45e4f3ca057d71 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -95,6 +95,8 @@ public: */ void get(const InfoHash& id, GetCallback cb, DoneCallback donecb, Value::Filter = Value::AllFilter()); + size_t listen(const InfoHash& id, GetCallback cb, Value::Filter = Value::AllFilter()); + /** * Will take ownership of the value, sign it using our private key and put it in the DHT. */ @@ -129,6 +131,8 @@ private: SecureDht(const SecureDht&) = delete; SecureDht& operator=(const SecureDht&) = delete; + GetCallback getCallbackFilter(GetCallback); + std::shared_ptr<crypto::PrivateKey> key_ {}; std::shared_ptr<crypto::Certificate> certificate_ {}; diff --git a/src/dht.cpp b/src/dht.cpp index 816c25ea1de790dcee9a2859d073bbaa626c8e75..24576c149ec58de446ed4acdf5d843bf574c3313 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -111,6 +111,7 @@ const Dht::TransPrefix Dht::TransPrefix::PING = {"pn"}; const Dht::TransPrefix Dht::TransPrefix::FIND_NODE = {"fn"}; const Dht::TransPrefix Dht::TransPrefix::GET_VALUES = {"gp"}; const Dht::TransPrefix Dht::TransPrefix::ANNOUNCE_VALUES = {"ap"}; +const Dht::TransPrefix Dht::TransPrefix::LISTEN = {"ls"}; const uint8_t Dht::my_v[9] = "1:v4:RNG"; @@ -660,7 +661,40 @@ Dht::searchStep(Search& sr) /* Check if the first 8 live nodes have replied. */ if (sr.isSynced(now.tv_sec)) { - DHT_DEBUG("searchStep (synced)."); + + // true if this node is part of the target nodes cluter. + bool in = sr.id.xorCmp(myid, sr.nodes.back().id) < 0; + + DHT_DEBUG("searchStep (synced%s).", in ? ", in" : ""); + // + /*auto list_t = sr.getListenTime(); + if (list_t && list_t < now.tv_sec) { + DHT_WARN("Send listen %u", sr.listeners.size()); + for (const auto& n : sr.nodes) { + if (n.pinged >= 3 && !n.isSynced(now.tv_sec)) + continue; + sendListen((sockaddr*)&n.ss, sizeof(sockaddr_storage), TransId {TransPrefix::LISTEN, sr.tid}, sr.id, n.token, n.reply_time >= now.tv_sec - 15); + sr.listen_time = now.tv_sec; + break; + } + }*/ + + if (not sr.listeners.empty()) { + DHT_WARN("Send listen %u", sr.listeners.size()); + unsigned i = 0; + for (auto& n : sr.nodes) { + if (n.pinged >= 3) + continue; + if (n.listenStatus.reply_time + 15*60 < now.tv_sec && n.listenStatus.request_time + 15 < now.tv_sec) { + sendListen((sockaddr*)&n.ss, sizeof(sockaddr_storage), TransId {TransPrefix::LISTEN, sr.tid}, sr.id, n.token, n.reply_time >= now.tv_sec - 15); + n.listenStatus.request_time = now.tv_sec; + } + if (++i == 2) + break; + } + } + + // Announce requests for (auto& a : sr.announce) { if (!a.value) { continue; @@ -670,6 +704,10 @@ Dht::searchStep(Search& sr) bool all_acked = true; auto vid = a.value->id; const auto& type = getType(a.value->type); + if (in) { + DHT_WARN("Storing local value"); + storageStore(sr.id, a.value); + } for (auto& n : sr.nodes) { if (n.pinged >= 3) continue; @@ -722,13 +760,15 @@ Dht::searchStep(Search& sr) sr.done_callback(true); sr.done_callback = nullptr; } - if (sr.announce.empty()) + if (sr.announce.empty() && sr.listeners.empty()) sr.done = true; } else { DHT_DEBUG("searchStep."); if (sr.step_time + SEARCH_GET_STEP >= now.tv_sec) return; - if (sr.nodes.empty() && sr.announce.empty()) { + + // Listening or announcing requires keeping the cluster up to date. + if (sr.nodes.empty() && sr.announce.empty() && sr.listeners.empty()) { sr.done = true; return; } @@ -814,6 +854,26 @@ Dht::Search::getAnnounceTime(const std::map<ValueType::Id, ValueType>& types) co return ret; } +time_t +Dht::Search::getListenTime(time_t /* now */) const +{ + if (listeners.empty()) return 0; + time_t listen_time = 0; + unsigned i = 0; + for (const auto& sn : nodes) { + if (sn.pinged >= 3) + continue; + time_t next_req_min = sn.listenStatus.request_time + 15; + time_t lt = sn.listenStatus.reply_time ? + std::max(sn.listenStatus.reply_time + 15*60, next_req_min) : + next_req_min; + listen_time = listen_time ? std::min(listen_time, lt) : lt; + if (++i == 2) + break; + } + return listen_time; +} + time_t Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, time_t now) const { @@ -821,7 +881,15 @@ Dht::Search::getNextStepTime(const std::map<ValueType::Id, ValueType>& types, ti return 0; if (!isSynced(now)) return step_time + SEARCH_GET_STEP + 1; - return getAnnounceTime(types); + + auto at = getAnnounceTime(types); + auto lt = getListenTime(now); + if (at && lt) + return std::min(at, lt); + else if (!at) + return lt; + else + return at; } void @@ -926,6 +994,71 @@ Dht::announce(const InfoHash& id, sa_family_t af, const std::shared_ptr<Value>& } } +size_t +Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f) +{ + DHT_WARN("listenTo %s", id.toString().c_str()); + auto sri = std::find_if (searches.begin(), searches.end(), [id,af](const Search& s) { + return s.id == id && s.af == af; + }); + Search* sr = (sri == searches.end()) ? search(id, af, cb, nullptr) : &(*sri); + if (!sr) + throw DhtException("Can't create search"); + sr->done = false; + auto token = ++sr->listener_token; + sr->listeners.insert({token, {f, cb}}); + if (not sr->nodes.empty()) { + time_t tm = sr->getNextStepTime(types, now.tv_sec); + if (tm != 0 && (search_time == 0 || search_time > tm)) + search_time = tm; + } + return token; +} + +size_t +Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f) +{ + auto vals = std::make_shared<std::map<Value::Id, std::shared_ptr<Value>>>(); + auto gcb = [=](const std::vector<std::shared_ptr<Value>>& values) { + std::vector<std::shared_ptr<Value>> newvals {}; + for (const auto& v : values) { + auto it = vals->find(v->id); + if (it == vals->cend() || !(*it->second == *v) ) + newvals.push_back(v); + } + if (!newvals.empty()) { + cb(newvals); + for (const auto& v : newvals) + vals->insert({v->id, v}); + } + return true; + }; + + auto token4 = Dht::listenTo(id, AF_INET, gcb, f); + auto token6 = Dht::listenTo(id, AF_INET6, gcb, f); + auto token = ++listener_token; + listeners.insert({token, {token4, token6}}); + return token; +} + +bool +Dht::cancelListen(const InfoHash& id, size_t token) +{ + auto it = listeners.find(token); + if (it == listeners.end()) + return false; + for (auto& s : searches) { + if (s.id != id) continue; + auto af_token = s.af == AF_INET ? it->second.first : it->second.second; + auto sit = s.listeners.find(af_token); + if (sit == s.listeners.end()) + continue; + s.listeners.erase(sit); + } + listeners.erase(it); + return true; +} + void Dht::put(const InfoHash& id, Value&& value, DoneCallback callback) { @@ -1097,6 +1230,16 @@ Dht::findStorage(const InfoHash& id) return nullptr; } +void +Dht::storageChanged(Storage& st) +{ + for (const auto& l : st.listeners) { + DHT_WARN("Storage changed. Sending update."); + Blob ntoken = makeToken((const sockaddr*)&l.ss, false); + sendClosestNodes((const sockaddr*)&l.ss, l.sslen, TransId {TransPrefix::GET_VALUES, l.tid}, st.id, WANT4 | WANT6, ntoken, &st); + } +} + Dht::ValueStorage* Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) { @@ -1104,7 +1247,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) if (!st) { if (store.size() >= MAX_HASHES) return nullptr; - store.push_back(Storage {id, {}}); + store.push_back(Storage {id, {}, {}}); st = &store.back(); } @@ -1117,6 +1260,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) if (it->data != value) { DHT_DEBUG("Updating %s -> %s", id.toString().c_str(), value->toString().c_str()); it->data = value; + storageChanged(*st); } return &*it; } else { @@ -1124,21 +1268,49 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value) if (st->values.size() >= MAX_VALUES) return nullptr; st->values.emplace_back(value, now.tv_sec); + storageChanged(*st); return &st->values.back(); } } +void +Dht::storageAddListener(const InfoHash& id, const InfoHash& node, const sockaddr *from, socklen_t fromlen, uint16_t tid) +{ + Storage *st = findStorage(id); + if (!st) { + if (store.size() >= MAX_HASHES) + return; + store.push_back(Storage {id, {}, {}}); + st = &store.back(); + } + st->listeners.emplace_back(node, from, fromlen, tid); +} + void Dht::expireStorage() { auto i = store.begin(); while (i != store.end()) { + // put elements to remove at the end with std::partition, + // and then remove them with std::vector::erase. + i->listeners.erase( + std::partition(i->listeners.begin(), i->listeners.end(), + [&](const Listener& l) + { + bool expired = l.time + 15*60 < now.tv_sec; + if (expired) + DHT_DEBUG("Discarding expired listener %s", l.id.toString().c_str()); + // return false if the element should be removed + return !expired; + }), + i->listeners.end()); + i->values.erase( std::partition(i->values.begin(), i->values.end(), [&](const ValueStorage& v) { - if (!v.data) return true; // should not happen + if (!v.data) return false; // should not happen const auto& type = getType(v.data->type); bool expired = v.time + type.expiration < now.tv_sec; if (expired) @@ -1147,7 +1319,7 @@ Dht::expireStorage() }), i->values.end()); - if (i->values.size() == 0) { + if (i->values.empty() && i->listeners.empty()) { DHT_DEBUG("Discarding expired value %s", i->id.toString().c_str()); i = store.erase(i); } @@ -1306,6 +1478,13 @@ Dht::dumpSearch(const Search& sr, std::ostream& out) const else out << " announce at " << at << ", in " << (at-now.tv_sec) << " s."; } + if (synced && not sr.listeners.empty()) { + auto lt = sr.getListenTime(now.tv_sec); + if (lt && lt > now.tv_sec) + out << " [listening]"; + else + out << " listen at " << lt << ", in " << (lt-now.tv_sec) << " s."; + } out << std::endl; for (const auto& n : sr.announce) { @@ -1443,7 +1622,7 @@ Dht::neighbourhoodMaintenance(RoutingTable& list) /* Since our node-id is the same in both DHTs, it's probably profitable to query both families. */ - int want = dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; + want_t want = dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; Node *n = q->randomNode(); if (n) { DHT_DEBUG("Sending find_node for%s neighborhood maintenance.", q->af == AF_INET6 ? " IPv6" : ""); @@ -1482,7 +1661,7 @@ Dht::bucketMaintenance(RoutingTable& list) Node *n = q->randomNode(); if (n) { - int want = -1; + want_t want = -1; if (dht_socket >= 0 && dht_socket6 >= 0) { auto otherbucket = findBucket(id, q->af == AF_INET ? AF_INET6 : AF_INET); @@ -1532,7 +1711,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc std::vector<std::shared_ptr<Value>> values; - int want; + want_t want; uint16_t ttid; if (isMartian(from, fromlen)) @@ -1696,10 +1875,29 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc /* See comment for gp above. */ searchSendGetValues(*sr); } + } else if (tid.matches(TransPrefix::LISTEN, &ttid)) { + DHT_DEBUG("Got reply to listen."); + Search *sr = findSearch(ttid, from->sa_family); + if (!sr || value_id == Value::INVALID_ID) { + DHT_DEBUG("Unknown search or announce!"); + newNode(id, from, fromlen, 1); + } else { + newNode(id, from, fromlen, 2); + for (auto& sn : sr->nodes) + if (sn.id == id) { + sn.listenStatus.reply_time = now.tv_sec; + //sn.request_time = 0; + sn.pinged = 0; + break; + } + /* See comment for gp above. */ + searchSendGetValues(*sr); + } + } else { DHT_WARN("Unexpected reply: "); DHT_WARN.logPrintable(buf, buflen); - } + } break; case MessageType::Ping: DHT_DEBUG("Got ping (%d)!", tid.length); @@ -1778,6 +1976,25 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const sockaddr *from, soc DHT_DEBUG("Sending announceValue confirmation."); sendValueAnnounced(from, fromlen, tid, v->id); } + break; + case MessageType::Listen: + if (info_hash == zeroes) { + DHT_WARN("Listen with no info_hash."); + sendError(from, fromlen, tid, 203, "Listen with no info_hash"); + break; + } + if (!tokenMatch(token, from)) { + DHT_WARN("Incorrect token %s for announce_values.", to_hex(token.data(), token.size()).c_str()); + sendError(from, fromlen, tid, 401, "Listen with wrong token"); + break; + } + if (!tid.matches(TransPrefix::LISTEN, &ttid)) { + break; + } + newNode(id, from, fromlen, 1); + storageAddListener(info_hash, id, from, fromlen, ttid); + sendListenConfirmation(from, fromlen, tid); + break; } } @@ -2030,7 +2247,7 @@ Dht::sendPong(const sockaddr *sa, socklen_t salen, TransId tid) int Dht::sendFindNode(const sockaddr *sa, socklen_t salen, TransId tid, - const InfoHash& target, int want, int confirm) + const InfoHash& target, want_t want, int confirm) { constexpr const size_t BUF_SZ = 512; char buf[BUF_SZ]; @@ -2167,7 +2384,7 @@ Dht::bufferClosestNodes(uint8_t *nodes, unsigned numnodes, const InfoHash& id, c int Dht::sendClosestNodes(const sockaddr *sa, socklen_t salen, TransId tid, - const InfoHash& id, int want, const Blob& token, Storage *st) + const InfoHash& id, want_t want, const Blob& token, Storage *st) { uint8_t nodes[8 * 26]; uint8_t nodes6[8 * 38]; @@ -2213,9 +2430,9 @@ Dht::sendClosestNodes(const sockaddr *sa, socklen_t salen, TransId tid, int Dht::sendGetValues(const sockaddr *sa, socklen_t salen, TransId tid, const InfoHash& infohash, - int want, int confirm) + want_t want, int confirm) { - const size_t BUF_SZ = 2048 * 4; + static constexpr const size_t BUF_SZ = 2048 * 4; char buf[BUF_SZ]; size_t i = 0; int rc; @@ -2238,17 +2455,56 @@ Dht::sendGetValues(const sockaddr *sa, socklen_t salen, return send(buf, i, confirm ? MSG_CONFIRM : 0, sa, salen); } +int +Dht::sendListen(const sockaddr* sa, socklen_t salen, TransId tid, + const InfoHash& infohash, const Blob& token, int confirm) +{ + static constexpr const size_t BUF_SZ = 2048; + char buf[BUF_SZ]; + size_t i = 0; + int rc; + + rc = snprintf(buf + i, BUF_SZ - i, "d1:ad2:id%lu:", myid.size()); INC(i, rc, BUF_SZ); + COPY(buf, i, myid.data(), myid.size(), BUF_SZ); + rc = snprintf(buf + i, BUF_SZ - i, "9:info_hash%lu:", infohash.size()); INC(i, rc, BUF_SZ); + COPY(buf, i, infohash.data(), infohash.size(), BUF_SZ); + + rc = snprintf(buf + i, BUF_SZ - i, "e5:token%lu:", token.size()); INC(i, rc, BUF_SZ); + COPY(buf, i, token.data(), token.size(), BUF_SZ); + rc = snprintf(buf + i, BUF_SZ - i, "e1:q6:listen1:t%u:", tid.length); INC(i, rc, BUF_SZ); + COPY(buf, i, tid.data(), tid.length, BUF_SZ); + ADD_V(buf, i, BUF_SZ); + rc = snprintf(buf + i, BUF_SZ - i, "1:y1:qe"); INC(i, rc, BUF_SZ); + + return send(buf, i, confirm ? 0 : MSG_CONFIRM, sa, salen); +} + +int +Dht::sendListenConfirmation(const sockaddr* sa, socklen_t salen, TransId tid) +{ + static constexpr const size_t BUF_SZ = 512; + char buf[BUF_SZ]; + int i = 0, rc; + + rc = snprintf(buf + i, BUF_SZ - i, "d1:rd2:id20:"); INC(i, rc, BUF_SZ); + COPY(buf, i, myid.data(), myid.size(), BUF_SZ); + rc = snprintf(buf + i, BUF_SZ - i, "e1:t%u:", tid.length); INC(i, rc, BUF_SZ); + COPY(buf, i, tid.data(), tid.length, BUF_SZ); + ADD_V(buf, i, BUF_SZ); + rc = snprintf(buf + i, BUF_SZ - i, "1:y1:re"); INC(i, rc, BUF_SZ); + return send(buf, i, 0, sa, salen); +} + int Dht::sendAnnounceValue(const sockaddr *sa, socklen_t salen, TransId tid, const InfoHash& infohash, const Value& value, const Blob& token, int confirm) { - const size_t BUF_SZ = 2048 * 4; + constexpr const size_t BUF_SZ = 2048 * 4; char buf[BUF_SZ]; size_t i = 0; - int rc; - rc = snprintf(buf + i, BUF_SZ - i, "d1:ad2:id%lu:", myid.size()); INC(i, rc, BUF_SZ); + int rc = snprintf(buf + i, BUF_SZ - i, "d1:ad2:id%lu:", myid.size()); INC(i, rc, BUF_SZ); COPY(buf, i, myid.data(), myid.size(), BUF_SZ); rc = snprintf(buf + i, BUF_SZ - i, "9:info_hash%lu:", infohash.size()); INC(i, rc, BUF_SZ); COPY(buf, i, infohash.data(), infohash.size(), BUF_SZ); @@ -2314,7 +2570,7 @@ Dht::parseMessage(const uint8_t *buf, size_t buflen, uint8_t *nodes_return, unsigned *nodes_len, uint8_t *nodes6_return, unsigned *nodes6_len, std::vector<std::shared_ptr<Value>>& values_return, - int *want_return, uint16_t& error_code) + want_t* want_return, uint16_t& error_code) { const uint8_t *p; @@ -2485,6 +2741,8 @@ Dht::parseMessage(const uint8_t *buf, size_t buflen, return MessageType::GetValues; if (dht_memmem(buf, buflen, "1:q13:announce_peer", 19)) return MessageType::AnnounceValue; + if (dht_memmem(buf, buflen, "1:q6:listen", 11)) + return MessageType::Listen; throw DhtException("Can't read message type."); } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index d4293c99e47c7154c9761025cdb5b94f1391d1b8..f9c7cf0a5e9907ffeeab0459c5deb95efc7b0a3c 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -111,6 +111,12 @@ DhtRunner::loop_() } dht_gets.clear(); + for (auto& list : dht_listen) { + std::cout << "Processing listen (" << std::get<0>(list) << ")" << std::endl; + dht->listen(std::get<0>(list), std::move(std::get<1>(list)), std::move(std::get<2>(list))); + } + dht_listen.clear(); + for (auto& put : dht_eputs) { auto& id = std::get<0>(put); auto& val = std::get<2>(put); @@ -267,6 +273,20 @@ DhtRunner::get(const std::string& key, Dht::GetCallback vcb, Dht::DoneCallback d get(InfoHash::get(key), vcb, dcb, f); } +void +DhtRunner::listen(InfoHash hash, Dht::GetCallback vcb, Value::Filter f) +{ + std::unique_lock<std::mutex> lck(storage_mtx); + dht_listen.emplace_back(hash, vcb, f); + cv.notify_all(); +} + +void +DhtRunner::listen(const std::string& key, Dht::GetCallback vcb, Value::Filter f) +{ + listen(InfoHash::get(key), vcb, f); +} + void DhtRunner::put(InfoHash hash, Value&& value, Dht::DoneCallback cb) { diff --git a/src/securedht.cpp b/src/securedht.cpp index d6e5745ad7eb194d2df91044a3005ac976c09976..9a57b40d6cda962129cac01213e9e1965dcd6068 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -178,42 +178,55 @@ SecureDht::findCertificate(const InfoHash& node, std::function<void(const std::s } -void -SecureDht::get(const InfoHash& id, GetCallback cb, DoneCallback donecb, Value::Filter filter) +Dht::GetCallback +SecureDht::getCallbackFilter(GetCallback cb) { - Dht::get(id, - [=](const std::vector<std::shared_ptr<Value>>& values) { - std::vector<std::shared_ptr<Value>> tmpvals {}; - for (const auto& v : values) { - if (v->isEncrypted()) { - try { - Value decrypted_val = std::move(decrypt(*v)); - if (decrypted_val.recipient == getId()) { - auto dv = std::make_shared<Value>(std::move(decrypted_val)); - if (dv->owner.checkSignature(dv->getToSign(), dv->signature)) - tmpvals.push_back(v); - else - DHT_WARN("Signature verification failed for %s", id.toString().c_str()); - } - } catch (const std::exception& e) { - DHT_WARN("Could not decrypt value %s at infohash %s", v->toString().c_str(), id.toString().c_str()); - continue; + return [=](const std::vector<std::shared_ptr<Value>>& values) { + std::vector<std::shared_ptr<Value>> tmpvals {}; + for (const auto& v : values) { + // Decrypt encrypted values + if (v->isEncrypted()) { + try { + Value decrypted_val (decrypt(*v)); + if (decrypted_val.recipient == getId()) { + if (decrypted_val.owner.checkSignature(decrypted_val.getToSign(), decrypted_val.signature)) + tmpvals.push_back(std::make_shared<Value>(std::move(decrypted_val))); + else + DHT_WARN("Signature verification failed for %s", v->toString().c_str()); } - } else if (v->isSigned()) { - if (v->owner.checkSignature(v->getToSign(), v->signature)) - tmpvals.push_back(v); - else - DHT_WARN("Signature verification failed for %s", id.toString().c_str()); - } else { - tmpvals.push_back(v); + // Ignore values belonging to other people + } catch (const std::exception& e) { + DHT_WARN("Could not decrypt value %s", v->toString().c_str()); } } - if (not tmpvals.empty()) - cb(tmpvals); - return true; - }, - donecb, - filter); + // Check signed values + else if (v->isSigned()) { + if (v->owner.checkSignature(v->getToSign(), v->signature)) + tmpvals.push_back(v); + else + DHT_WARN("Signature verification failed for %s", v->toString().c_str()); + } + // Forward normal values + else { + tmpvals.push_back(v); + } + } + if (not tmpvals.empty()) + cb(tmpvals); + return true; + }; +} + +void +SecureDht::get(const InfoHash& id, GetCallback cb, DoneCallback donecb, Value::Filter filter) +{ + Dht::get(id, getCallbackFilter(cb), donecb, filter); +} + +size_t +SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter filter) +{ + return Dht::listen(id, getCallbackFilter(cb), filter); } void