diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 9e54b8ee328d1920337a6efdefec398f2453bd77..9355cbc433584f578960239ee875ab0d1dcf2149 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -139,7 +139,7 @@ private: /** * Called when an addres is reported from a requested node. * - * @param h: id + * @param h: id * @param saddr_len (type: socklen_t) lenght of the sockaddr struct. */ std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr; @@ -521,10 +521,10 @@ private: }; - int send(msgpack::sbuffer& msg, const Sp<Node>& node); + int send(msgpack::sbuffer& msg, const Node& node, const Sp<TcpSocket>& sock = {}); int sendUDP(msgpack::sbuffer& msg, const SockAddr& addr); - int send(const Blob& msg, int flags, const Sp<Node>& node); + int send(const Blob& msg, int flags, const Node& node); void startTcp(const Sp<TcpSocket>& sock, const Sp<Node>& node = {}); @@ -561,7 +561,8 @@ private: TransId tid, uint16_t code, const std::string& message, - bool include_id=false); + bool include_id=false, + const Sp<TcpSocket>& sock= {}); void deserializeNodes(ParsedMessage& msg, const SockAddr& from); diff --git a/include/opendht/node.h b/include/opendht/node.h index 3af896b7e6e670fd2cdc4324a6ca70a6cf910a82..e23407e207654c2cd93cdcb1ec203d678e51d351 100644 --- a/include/opendht/node.h +++ b/include/opendht/node.h @@ -39,7 +39,7 @@ struct Node { Sp<TcpSocket> sock; InfoHash last_known_pk; - //std::list<crypto::EcSecretKey> + //std::list<crypto::EcSecretKey> time_point time {time_point::min()}; /* last time eared about */ time_point reply_time {time_point::min()}; /* time of last correct reply received */ @@ -61,9 +61,12 @@ struct Node { std::string getAddrStr() const { return addr.toString(); } + SockAddr getAddress() const { + return canStream() ? sock->getPeerAddr() : addr; + } void startTCP() { - + } /** diff --git a/src/dht.cpp b/src/dht.cpp index 27a18e7521e62423dcbf8bdcf9e977804fe68365..01afd9574303e1a056d3136f8e36965b0a79e097 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -2318,7 +2318,7 @@ Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t soc net::DhtProtocolException::LISTEN_NO_INFOHASH }; } - if (not tokenMatch(token, node->addr)) { + if (not tokenMatch(token, node->getAddress())) { DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str()); throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN}; } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 74e9ceeb34dc4c96481058da4f60833c9cee7feb..20527a645dad1088577fd73e6be54f88dc4141b8 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -189,14 +189,6 @@ NetworkEngine::NetworkEngine(uv_loop_t* loop, const NetworkConfig& config, NetId network(net), DHT_LOG(log), sock(std::make_shared<UdpSocket>(loop)), tcp_sock(std::make_shared<TcpSocket>(loop)), id_key(crypto::EcSecretKey::generate()), scheduler(scheduler) { - if (dht_socket >= 0) { - if (!set_nonblocking(dht_socket, 1)) - throw DhtException("Can't set socket to non-blocking mode"); - } - if (dht_socket6 >= 0) { - if (!set_nonblocking(dht_socket6, 1)) - throw DhtException("Can't set socket to non-blocking mode"); - } transaction_id = std::uniform_int_distribution<decltype(transaction_id)>{1}(rd_device); using namespace std::placeholders; sock->open(config.bind_addr.c_str(), config.bind_port, std::bind(&NetworkEngine::processMessage, this, _1, _2, _3)); @@ -290,25 +282,39 @@ NetworkEngine::requestStep(Sp<Request> sreq) req.on_expired(req, false); } - auto err = send(req.msg, req.msg.size(), - (node.reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM, - node.addr); - if (err == ENETUNREACH || - err == EHOSTUNREACH || - err == EAFNOSUPPORT) - { - node.setExpired(); - requests.erase(req.tid); - } else { - if (err != EAGAIN) { - ++req.attempt_count; - } + if (node.sock and node.sock->canWrite()) { + auto data = (uint8_t*)malloc(req.msg.size()); + memcpy(data, req.msg.data(), req.msg.size()); req.last_try = now; - std::weak_ptr<Request> wreq = sreq; - scheduler.add(req.last_try + Node::MAX_RESPONSE_TIME, [this,wreq] { - if (auto req = wreq.lock()) - requestStep(req); + ++req.attempt_count; + node.sock->write(data, req.msg.size(), [&,sreq](int status){ + if (status != 0) { + DHT_LOG.w(sreq->node->id, "write failed ! %d", status); + sreq->node->setExpired(); + } }); + } else { + // UDP + auto err = send(req.msg, + (node.reply_time >= now - UDP_REPLY_TIME) ? 0 : MSG_CONFIRM, + node); + if (err == ENETUNREACH || + err == EHOSTUNREACH || + err == EAFNOSUPPORT) + { + node.setExpired(); + requests.erase(req.tid); + } else { + if (err != EAGAIN) { + ++req.attempt_count; + } + req.last_try = now; + std::weak_ptr<Request> wreq = sreq; + scheduler.add(req.last_try + Node::MAX_RESPONSE_TIME, [this,wreq] { + if (auto req = wreq.lock()) + requestStep(req); + }); + } } } @@ -428,8 +434,10 @@ NetworkEngine::startTcp(const Sp<TcpSocket>& sock, const Sp<Node>& node) msgpack::object_handle result; // Message pack data loop while(rx_data->unpacker.next(result)) { - //std::unique_ptr<ParsedMessage> msg {new ParsedMessage}; + std::unique_ptr<ParsedMessage> msg {new ParsedMessage}; SockAddr from = sock->getPeerAddr(); + if (from.isMappedIPv4()) + from = from.getMappedIPv4(); Sp<Node> node_found; try { const auto& o = result.get(); @@ -439,8 +447,8 @@ NetworkEngine::startTcp(const Sp<TcpSocket>& sock, const Sp<Node>& node) //msg->msgpack_unpack(oh.get()); node_found = processEncrypted(oh.get(), from); } else { - //msg->msgpack_unpack(o); - node_found = process(o, from); + msg->msgpack_unpack(o); + node_found = process(std::move(msg), from, sock); } } catch (const std::exception& e) { DHT_LOG.w("Can't process message: %s", e.what()); @@ -589,7 +597,7 @@ NetworkEngine::process(const msgpack::object& o, const SockAddr& from) } Sp<Node> -NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from, const Sp<TcpSocket>& sock) +NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from, const Sp<TcpSocket>& tcpsock) { const auto& now = scheduler.time(); Sp<Node> node; @@ -611,7 +619,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro node = req ? req->node : rsocket->node; if (node->id != msg->id) { if (node->id == zeroes) // received reply to a message sent when we didn't know the node ID. - node = cache.getNode(msg->id, from, sock, now, true); + node = cache.getNode(msg->id, from, tcpsock, now, true); else { // received reply from unexpected node node->received(now, req); @@ -620,7 +628,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro return node; } } else - node->update(from, sock); + node->update(from, tcpsock); node->received(now, req); @@ -669,7 +677,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } } else { - node = cache.getNode(msg->id, from, sock, now, true); + node = cache.getNode(msg->id, from, tcpsock, now, true); node->received(now, {}); onNewNode(node, 1); try { @@ -730,7 +738,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro } catch (const std::overflow_error& e) { DHT_LOG.e("Can't send value: buffer not large enough !"); } catch (DhtProtocolException& e) { - sendError(node, msg->tid, e.getCode(), e.getMsg().c_str(), true); + DHT_LOG.w("Error during request handling, sending error ! %p", tcpsock.get()); + sendError(node, msg->tid, e.getCode(), e.getMsg().c_str(), true, tcpsock); } } @@ -765,17 +774,18 @@ NetworkEngine::sendUDP(msgpack::sbuffer& msg, const SockAddr& addr) } int -NetworkEngine::send(msgpack::sbuffer& msg, const Sp<Node>& node) +NetworkEngine::send(msgpack::sbuffer& msg, const Node& node, const Sp<TcpSocket>& tcpsock) { + DHT_LOG.d(node.id, "[node %s] send %lu", node.getId().to_c_str(), msg.size()); uint8_t* data; size_t size; - if (node->id == InfoHash{}) { + if (node.id == InfoHash{}) { size = msg.size(); data = (uint8_t*)msg.release(); } else { Blob encrypted_out; { - Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node->id); + Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node.id); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_array(2); @@ -783,7 +793,7 @@ NetworkEngine::send(msgpack::sbuffer& msg, const Sp<Node>& node) pk.pack_bin(encrypted.size()); pk.pack_bin_body((const char *)encrypted.data(), encrypted.size()); - encrypted_out = node->id.encrypt((const uint8_t *)buffer.data(), buffer.size()); + encrypted_out = node.id.encrypt((const uint8_t *)buffer.data(), buffer.size()); } msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -792,26 +802,31 @@ NetworkEngine::send(msgpack::sbuffer& msg, const Sp<Node>& node) size = buffer.size(); data = (uint8_t*)buffer.release(); } - if (node->canStream()) { - return node->sock->write(data, size); + if (tcpsock) { + return tcpsock->write(data, size); + } else if (node.canStream()) { + DHT_LOG.d(node.id, "[node %s] send (stream) %lu", node.getId().to_c_str(), msg.size()); + return node.sock->write(data, size); } else { - return sock->send(data, size, node->addr); + DHT_LOG.d(node.id, "[node %s] send (datagram) %lu", node.getId().to_c_str(), msg.size()); + return sock->send(data, size, node.addr); } } int -NetworkEngine::send(const Blob& msg, int /*flags*/, const Sp<Node>& node) +NetworkEngine::send(const Blob& msg, int /*flags*/, const Node& node) { + DHT_LOG.d(node.id, "[node %s] send %lu", node.getId().to_c_str(), msg.size()); uint8_t* data; size_t size; - if (node->id == InfoHash{}) { + if (node.id == InfoHash{}) { size = msg.size(); data = (uint8_t*)malloc(size);//(uint8_t*)msg.release(); std::memcpy(data, msg.data(), size); } else { Blob encrypted_out; { - Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node->id); + Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node.id); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_array(2); @@ -819,7 +834,7 @@ NetworkEngine::send(const Blob& msg, int /*flags*/, const Sp<Node>& node) pk.pack_bin(encrypted.size()); pk.pack_bin_body((const char *)encrypted.data(), encrypted.size()); - encrypted_out = node->id.encrypt((const uint8_t *)buffer.data(), buffer.size()); + encrypted_out = node.id.encrypt((const uint8_t *)buffer.data(), buffer.size()); } msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -828,10 +843,12 @@ NetworkEngine::send(const Blob& msg, int /*flags*/, const Sp<Node>& node) size = buffer.size(); data = (uint8_t*)buffer.release(); } - if (node->canStream()) { - return node->sock->write(data, size); + if (node.canStream()) { + DHT_LOG.d(node.id, "[node %s] send (stream) %lu", node.getId().to_c_str(), msg.size()); + return node.sock->write(data, size); } else { - return sock->send(data, size, node->addr); + DHT_LOG.d(node.id, "[node %s] send (datagram) %lu", node.getId().to_c_str(), msg.size()); + return sock->send(data, size, node.addr); } } @@ -891,7 +908,7 @@ NetworkEngine::sendPong(const Sp<Node>& node, TransId tid) { pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node); } Sp<Request> @@ -1154,7 +1171,7 @@ NetworkEngine::sendNodesValues(const Sp<Node>& node, TransId tid, const Blob& no } // send response - send(buffer, node); + send(buffer, *node); // send parts if (not svals.empty()) @@ -1309,7 +1326,7 @@ NetworkEngine::sendListenConfirmation(const Sp<Node>& node, TransId tid) { pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node); } Sp<Request> @@ -1442,7 +1459,7 @@ NetworkEngine::sendValueAnnounced(const Sp<Node>& node, TransId tid, Value::Id v pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node); } void @@ -1450,7 +1467,8 @@ NetworkEngine::sendError(const Sp<Node>& node, TransId tid, uint16_t code, const std::string& message, - bool include_id) + bool include_id, + const Sp<TcpSocket>& tcpsock) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -1473,7 +1491,7 @@ NetworkEngine::sendError(const Sp<Node>& node, pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node, tcpsock); } void diff --git a/src/search.h b/src/search.h index 81a3fe947747106e6a5c593c202ab08c36a177a5..bcb676994a9df983f27242ee2b03fe7342abefc5 100644 --- a/src/search.h +++ b/src/search.h @@ -276,7 +276,7 @@ struct Dht::Search { uint16_t tid; time_point refill_time {time_point::min()}; time_point step_time {time_point::min()}; /* the time of the last search step */ - Sp<Scheduler::Job> nextSearchStep {}; + Sp<Job> nextSearchStep {}; bool expired {false}; /* no node, or all nodes expired */ bool done {false}; /* search is over, cached for later */