diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index ab29b97e44289d645df4badc2cc59622b1276af8..dd407f876b5c255612458cee01d7b02ec339b9be 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -479,10 +479,10 @@ private: }; - int send(msgpack::sbuffer& msg, const Sp<Node>& node); + int send(msgpack::sbuffer& msg, const Node& node, const Sp<TcpSocket>& sock = {}); int sendUDP(msgpack::sbuffer& msg, const SockAddr& addr); - int send(const Blob& msg, int flags, const Sp<Node>& node, UdpSocket::OnSent&& cb); + int send(const Blob& msg, int flags, const Node& node, UdpSocket::OnSent&& cb); void startTcp(const Sp<TcpSocket>& sock, const Sp<Node>& node = {}); @@ -519,7 +519,8 @@ private: TransId tid, uint16_t code, const std::string& message, - bool include_id=false); + bool include_id=false, + const Sp<TcpSocket>& sock= {}); void deserializeNodes(ParsedMessage& msg, const SockAddr& from); diff --git a/include/opendht/node.h b/include/opendht/node.h index 90b78caecbe147e3efffec2e73221f029cf38ee3..aa308236aad510f5fa58dfa3d9066cdce6a3a2bc 100644 --- a/include/opendht/node.h +++ b/include/opendht/node.h @@ -53,7 +53,7 @@ struct Node { Sp<TcpSocket> sock; InfoHash last_known_pk; - //std::list<crypto::EcSecretKey> + //std::list<crypto::EcSecretKey> Node(const InfoHash& id, const SockAddr& addr, const Sp<TcpSocket>& s = {}, bool client=false); Node(const InfoHash& id, const sockaddr* sa, socklen_t salen) @@ -70,6 +70,7 @@ struct Node { std::string getAddrStr() const { return addr.toString(); } + bool isClient() const { return is_client; } bool isIncoming() { return time > reply_time; } @@ -77,8 +78,12 @@ struct Node { const time_point& getReplyTime() const { return reply_time; } void setTime(const time_point& t) { time = t; } + SockAddr getAddress() const { + return canStream() ? sock->getPeerAddr() : addr; + } + void startTCP() { - + } /** diff --git a/src/dht.cpp b/src/dht.cpp index cdbb40c0d43cfca371795fa2fc393d436d6f26ff..0787de29eb65d6a8c5502ff21a7d562e2d81e9bb 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -2237,7 +2237,7 @@ Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t soc net::DhtProtocolException::LISTEN_NO_INFOHASH }; } - if (not tokenMatch(token, node->getAddr())) { + if (not tokenMatch(token, node->getAddress())) { DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str()); throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN}; } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 44e1478101861b83262f00a2f92997a5644d01c6..a01c13fb7e172f4dfbeb8e580b4a1f727dc493e9 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -165,6 +165,20 @@ NetworkEngine::requestStep(Sp<Request> sreq) req.on_expired(req, false); } + if (node.sock and node.sock->canWrite()) { + auto data = (uint8_t*)malloc(req.msg.size()); + memcpy(data, req.msg.data(), req.msg.size()); + req.last_try = now; + ++req.attempt_count; + node.sock->write(data, req.msg.size(), [&,sreq](int status){ + if (status != 0) { + DHT_LOG.w(sreq->node->id, "write failed ! %d", status); + sreq->node->setExpired(); + } + }); + return; + } + // UDP std::weak_ptr<Request> wreq = sreq; auto err = send(req.msg, 0, node.getAddr(), [this,wreq](int status) { if (auto req = wreq.lock()) { @@ -291,8 +305,10 @@ NetworkEngine::startTcp(const Sp<TcpSocket>& sock, const Sp<Node>& node) msgpack::object_handle result; // Message pack data loop while(rx_data->unpacker.next(result)) { - //std::unique_ptr<ParsedMessage> msg {new ParsedMessage}; + std::unique_ptr<ParsedMessage> msg {new ParsedMessage}; SockAddr from = sock->getPeerAddr(); + if (from.isMappedIPv4()) + from = from.getMappedIPv4(); Sp<Node> node_found; try { const auto& o = result.get(); @@ -302,8 +318,8 @@ NetworkEngine::startTcp(const Sp<TcpSocket>& sock, const Sp<Node>& node) //msg->msgpack_unpack(oh.get()); node_found = processEncrypted(oh.get(), from); } else { - //msg->msgpack_unpack(o); - node_found = process(o, from); + msg->msgpack_unpack(o); + node_found = process(std::move(msg), from, sock); } } catch (const std::exception& e) { DHT_LOG.w("Can't process message: %s", e.what()); @@ -452,10 +468,10 @@ NetworkEngine::process(const msgpack::object& o, const SockAddr& from) } Sp<Node> -NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from, const Sp<TcpSocket>& sock) +NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from, const Sp<TcpSocket>& tcpsock) { const auto& now = scheduler.time(); - auto node = cache.getNode(msg->id, from, now, sock, true, msg->is_client); + auto node = cache.getNode(msg->id, from, now, tcpsock, true, msg->is_client); if (msg->type == MessageType::Error or msg->type == MessageType::Reply) { auto rsocket = node->getSocket(msg->tid.toInt()); @@ -476,7 +492,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro return; } } - node->update(from, sock); + node->update(from, tcpsock); node->received(now, req); @@ -582,7 +598,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro } catch (const std::overflow_error& e) { DHT_LOG.e("Can't send value: buffer not large enough !"); } catch (DhtProtocolException& e) { - sendError(node, msg->tid, e.getCode(), e.getMsg().c_str(), true); + DHT_LOG.w("Error during request handling, sending error ! %p", tcpsock.get()); + sendError(node, msg->tid, e.getCode(), e.getMsg().c_str(), true, tcpsock); } } @@ -617,17 +634,18 @@ NetworkEngine::sendUDP(msgpack::sbuffer& msg, const SockAddr& addr) } int -NetworkEngine::send(msgpack::sbuffer& msg, const Sp<Node>& node) +NetworkEngine::send(msgpack::sbuffer& msg, const Node& node, const Sp<TcpSocket>& tcpsock) { + DHT_LOG.d(node.id, "[node %s] send %lu", node.getId().to_c_str(), msg.size()); uint8_t* data; size_t size; - if (node->id == InfoHash{}) { + if (node.id == InfoHash{}) { size = msg.size(); data = (uint8_t*)msg.release(); } else { Blob encrypted_out; { - Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node->id); + Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node.id); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_array(2); @@ -635,7 +653,7 @@ NetworkEngine::send(msgpack::sbuffer& msg, const Sp<Node>& node) pk.pack_bin(encrypted.size()); pk.pack_bin_body((const char *)encrypted.data(), encrypted.size()); - encrypted_out = node->id.encrypt((const uint8_t *)buffer.data(), buffer.size()); + encrypted_out = node.id.encrypt((const uint8_t *)buffer.data(), buffer.size()); } msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -644,26 +662,31 @@ NetworkEngine::send(msgpack::sbuffer& msg, const Sp<Node>& node) size = buffer.size(); data = (uint8_t*)buffer.release(); } - if (node->canStream()) { - return node->sock->write(data, size); + if (tcpsock) { + return tcpsock->write(data, size); + } else if (node.canStream()) { + DHT_LOG.d(node.id, "[node %s] send (stream) %lu", node.getId().to_c_str(), msg.size()); + return node.sock->write(data, size); } else { - return sock->send(data, size, node->addr); + DHT_LOG.d(node.id, "[node %s] send (datagram) %lu", node.getId().to_c_str(), msg.size()); + return sock->send(data, size, node.addr); } } int -NetworkEngine::send(const Blob& msg, int /*flags*/, const Sp<Node>& node, UdpSocket::OnSent&& cb) +NetworkEngine::send(const Blob& msg, int /*flags*/, const Node& node, UdpSocket::OnSent&& cb) { + DHT_LOG.d(node.id, "[node %s] send %lu", node.getId().to_c_str(), msg.size()); uint8_t* data; size_t size; - if (node->id == InfoHash{}) { + if (node.id == InfoHash{}) { size = msg.size(); data = (uint8_t*)malloc(size);//(uint8_t*)msg.release(); std::memcpy(data, msg.data(), size); } else { Blob encrypted_out; { - Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node->id); + Blob encrypted = id_key.encrypt((const uint8_t *)msg.data(), msg.size(), node.id); msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); pk.pack_array(2); @@ -671,7 +694,7 @@ NetworkEngine::send(const Blob& msg, int /*flags*/, const Sp<Node>& node, UdpSoc pk.pack_bin(encrypted.size()); pk.pack_bin_body((const char *)encrypted.data(), encrypted.size()); - encrypted_out = node->id.encrypt((const uint8_t *)buffer.data(), buffer.size()); + encrypted_out = node.id.encrypt((const uint8_t *)buffer.data(), buffer.size()); } msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -680,10 +703,12 @@ NetworkEngine::send(const Blob& msg, int /*flags*/, const Sp<Node>& node, UdpSoc size = buffer.size(); data = (uint8_t*)buffer.release(); } - if (node->canStream()) { - return node->sock->write(data, size); + if (node.canStream()) { + DHT_LOG.d(node.id, "[node %s] send (stream) %lu", node.getId().to_c_str(), msg.size()); + return node.sock->write(data, size); } else { - return sock->send(data, size, node->addr, std::move(cb)); + DHT_LOG.d(node.id, "[node %s] send (datagram) %lu", node.getId().to_c_str(), msg.size()); + return sock->send(data, size, node.addr, std::move(cb)); } } @@ -743,7 +768,7 @@ NetworkEngine::sendPong(const Sp<Node>& node, TransId tid) { pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node); } Sp<Request> @@ -1003,7 +1028,7 @@ NetworkEngine::sendNodesValues(const Sp<Node>& node, TransId tid, const Blob& no } // send response - send(buffer, node); + send(buffer, *node); // send parts if (not svals.empty()) @@ -1154,7 +1179,7 @@ NetworkEngine::sendListenConfirmation(const Sp<Node>& node, TransId tid) { pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node); } Sp<Request> @@ -1287,7 +1312,7 @@ NetworkEngine::sendValueAnnounced(const Sp<Node>& node, TransId tid, Value::Id v pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node); } void @@ -1295,7 +1320,8 @@ NetworkEngine::sendError(const Sp<Node>& node, TransId tid, uint16_t code, const std::string& message, - bool include_id) + bool include_id, + const Sp<TcpSocket>& tcpsock) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); @@ -1318,7 +1344,7 @@ NetworkEngine::sendError(const Sp<Node>& node, pk.pack(std::string("n")); pk.pack(network); } - send(buffer, node); + send(buffer, *node, tcpsock); } void