Skip to content
Snippets Groups Projects
Commit 6b78a4ca authored by Adrien Béraud's avatar Adrien Béraud
Browse files

peerdiscovery: catch parsing errors

parent 66d3a8ae
No related branches found
No related tags found
No related merge requests found
......@@ -116,7 +116,7 @@ private:
/**
* Receive messages
*/
std::pair<SockAddr, Blob> recvFrom();
std::pair<SockAddr, msgpack::object_handle> recvFrom();
/**
* Listener pack thread loop
......@@ -261,27 +261,23 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv
}
}
std::pair<SockAddr, Blob>
std::pair<SockAddr, msgpack::object_handle>
PeerDiscovery::DomainPeerDiscovery::recvFrom()
{
sockaddr_storage storeage_recv;
socklen_t sa_len = sizeof(storeage_recv);
std::array<uint8_t, 64 * 1024> recv;
sockaddr_storage sa;
socklen_t sa_len = sizeof(sa);
std::array<uint8_t, 64 * 1024> buf;
ssize_t nbytes = recvfrom(
sockfd_,
recv.data(),
recv.size(),
(char*)buf.data(), buf.size(),
0,
(sockaddr*)&storeage_recv,
&sa_len
(sockaddr*)&sa, &sa_len
);
if (nbytes < 0) {
if (nbytes < 0)
throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno));
}
SockAddr ret {storeage_recv, sa_len};
return {ret, Blob(recv.begin(), recv.begin()+nbytes+1)};
return { SockAddr { sa, sa_len }, msgpack::unpack(reinterpret_cast<char*>(buf.data()), nbytes) };
}
void
......@@ -304,7 +300,8 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread()
FD_SET(stop_readfd, &readfds);
FD_SET(sockfd_, &readfds);
int data_coming = select(sockfd_ > stop_readfd ? sockfd_ + 1 : stop_readfd + 1, &readfds, nullptr, nullptr, nullptr);
int data_coming = select(std::max(sockfd_, stop_readfd) + 1, &readfds, nullptr, nullptr, nullptr);
{
std::unique_lock<std::mutex> lck(dmtx_);
......@@ -317,30 +314,31 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread()
perror("Select Error");
std::this_thread::sleep_for( std::chrono::seconds(1) );
}
}
if (data_coming > 0) {
} else if (data_coming > 0) {
if (FD_ISSET(stop_readfd, &readfds)) {
std::array<uint8_t, 64 * 1024> buf;
recv(stop_readfd, (char*)buf.data(), buf.size(), 0);
}
auto rcv = recvFrom();
msgpack::object_handle oh = msgpack::unpack(reinterpret_cast<char*>(rcv.second.data()), rcv.second.size());
msgpack::object obj = oh.get();
try {
auto rcv = recvFrom();
msgpack::object obj = rcv.second.get();
if (obj.type != msgpack::type::MAP)
continue;
for (unsigned i = 0; i < obj.via.map.size; i++) {
auto& o = obj.via.map.ptr[i];
if (o.key.type != msgpack::type::STR)
if (obj.type != msgpack::type::MAP)
continue;
auto key = o.key.as<std::string>();
std::unique_lock<std::mutex> lck(dmtx_);
auto callback = callbackmap_.find(key);
if (callback != callbackmap_.end()){
callback->second(std::move(o.val), std::move(rcv.first));
for (unsigned i = 0; i < obj.via.map.size; i++) {
auto& o = obj.via.map.ptr[i];
if (o.key.type != msgpack::type::STR)
continue;
auto key = o.key.as<std::string>();
std::lock_guard<std::mutex> lck(dmtx_);
auto callback = callbackmap_.find(key);
if (callback != callbackmap_.end()){
callback->second(std::move(o.val), std::move(rcv.first));
}
}
} catch (const std::exception& e) {
std::cerr << "Error receiving packet: " << e.what() << std::endl;
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment