Skip to content
Snippets Groups Projects
Commit 679ac3ba authored by Adrien Béraud's avatar Adrien Béraud
Browse files

tcp

parent 419fc24f
No related branches found
No related tags found
No related merge requests found
......@@ -180,7 +180,7 @@ NetworkEngine::requestStep(Sp<Request> sreq)
}
// UDP
std::weak_ptr<Request> wreq = sreq;
auto err = send(req.msg, 0, node.getAddr(), [this,wreq](int status) {
auto err = send(req.msg, 0, node, [this,wreq](int status) {
if (auto req = wreq.lock()) {
if (status >= 0) {
++req->attempt_count;
......@@ -471,7 +471,7 @@ Sp<Node>
NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& from, const Sp<TcpSocket>& tcpsock)
{
const auto& now = scheduler.time();
auto node = cache.getNode(msg->id, from, now, tcpsock, true, msg->is_client);
auto node = cache.getNode(msg->id, from, tcpsock, now, true, msg->is_client);
if (msg->type == MessageType::Error or msg->type == MessageType::Reply) {
auto rsocket = node->getSocket(msg->tid.toInt());
......@@ -489,7 +489,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro
if (not node->isClient())
onNewNode(node, 1);
DHT_LOG.e(node->id, "[node %s] Can't find transaction", node->toString().c_str());
return;
return node;
}
}
node->update(from, tcpsock);
......@@ -669,7 +669,7 @@ NetworkEngine::send(msgpack::sbuffer& msg, const Node& node, const Sp<TcpSocket>
return node.sock->write(data, size);
} else {
DHT_LOG.d(node.id, "[node %s] send (datagram) %lu", node.getId().to_c_str(), msg.size());
return sock->send(data, size, node.addr);
return sock->send(data, size, node.getAddr());
}
}
......@@ -708,7 +708,7 @@ NetworkEngine::send(const Blob& msg, int /*flags*/, const Node& node, UdpSocket:
return node.sock->write(data, size);
} else {
DHT_LOG.d(node.id, "[node %s] send (datagram) %lu", node.getId().to_c_str(), msg.size());
return sock->send(data, size, node.addr, std::move(cb));
return sock->send(data, size, node.getAddr(), std::move(cb));
}
}
......@@ -758,7 +758,7 @@ NetworkEngine::sendPong(const Sp<Node>& node, TransId tid) {
pk.pack(std::string("r")); pk.pack_map(2);
pk.pack(std::string("id")); pk.pack(getNodeId());
insertAddr(pk, node->addr);
insertAddr(pk, node->getAddress());
pk.pack(std::string("t")); pk.pack_bin(tid.size());
pk.pack_bin_body((const char*)tid.data(), tid.size());
......@@ -988,7 +988,7 @@ NetworkEngine::sendNodesValues(const Sp<Node>& node, TransId tid, const Blob& no
pk.pack(std::string("r"));
pk.pack_map(2 + (not st.empty()?1:0) + (nodes.size()>0?1:0) + (nodes6.size()>0?1:0) + (not token.empty()?1:0));
pk.pack(std::string("id")); pk.pack(getNodeId());
insertAddr(pk, node->addr);
insertAddr(pk, node->getAddress());
if (nodes.size() > 0) {
pk.pack(std::string("n4"));
pk.pack_bin(nodes.size());
......@@ -1032,7 +1032,7 @@ NetworkEngine::sendNodesValues(const Sp<Node>& node, TransId tid, const Blob& no
// send parts
if (not svals.empty())
sendValueParts(tid, svals, node->addr);
sendValueParts(tid, svals, node->getAddress());
}
Blob
......@@ -1098,7 +1098,7 @@ NetworkEngine::sendListen(const Sp<Node>& n,
//n->startTcp(scheduler.getLoop());
if (not n->sock) {
n->sock = std::make_shared<TcpSocket>(scheduler.getLoop());
n->sock->connect(n->addr.get(), [this,n](int status){
n->sock->connect(n->getAddr().get(), [this,n](int status){
if (status == 0)
startTcp(n->sock, n);
});
......@@ -1169,7 +1169,7 @@ NetworkEngine::sendListenConfirmation(const Sp<Node>& node, TransId tid) {
pk.pack(std::string("r")); pk.pack_map(2);
pk.pack(std::string("id")); pk.pack(getNodeId());
insertAddr(pk, node->addr);
insertAddr(pk, node->getAddress());
pk.pack(std::string("t")); pk.pack_bin(tid.size());
pk.pack_bin_body((const char*)tid.data(), tid.size());
......@@ -1302,7 +1302,7 @@ NetworkEngine::sendValueAnnounced(const Sp<Node>& node, TransId tid, Value::Id v
pk.pack(std::string("r")); pk.pack_map(3);
pk.pack(std::string("id")); pk.pack(getNodeId());
pk.pack(std::string("vid")); pk.pack(vid);
insertAddr(pk, node->addr);
insertAddr(pk, node->getAddress());
pk.pack(std::string("t")); pk.pack_bin(tid.size());
pk.pack_bin_body((const char*)tid.data(), tid.size());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment