diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index 5b45e2a3b19dd18fef6e2484520953e155066894..56bd8534aad50e47b2d03abd4ecf111d1d29e8eb 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -116,7 +116,7 @@ private: /** * Receive messages */ - std::pair<SockAddr, Blob> recvFrom(); + std::pair<SockAddr, msgpack::object_handle> recvFrom(); /** * Listener pack thread loop @@ -261,27 +261,23 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv } } -std::pair<SockAddr, Blob> +std::pair<SockAddr, msgpack::object_handle> PeerDiscovery::DomainPeerDiscovery::recvFrom() { - sockaddr_storage storeage_recv; - socklen_t sa_len = sizeof(storeage_recv); - std::array<uint8_t, 64 * 1024> recv; + sockaddr_storage sa; + socklen_t sa_len = sizeof(sa); + std::array<uint8_t, 64 * 1024> buf; ssize_t nbytes = recvfrom( sockfd_, - recv.data(), - recv.size(), + (char*)buf.data(), buf.size(), 0, - (sockaddr*)&storeage_recv, - &sa_len + (sockaddr*)&sa, &sa_len ); - if (nbytes < 0) { + if (nbytes < 0) throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); - } - SockAddr ret {storeage_recv, sa_len}; - return {ret, Blob(recv.begin(), recv.begin()+nbytes+1)}; + return { SockAddr { sa, sa_len }, msgpack::unpack(reinterpret_cast<char*>(buf.data()), nbytes) }; } void @@ -304,7 +300,8 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() FD_SET(stop_readfd, &readfds); FD_SET(sockfd_, &readfds); - int data_coming = select(sockfd_ > stop_readfd ? sockfd_ + 1 : stop_readfd + 1, &readfds, nullptr, nullptr, nullptr); + + int data_coming = select(std::max(sockfd_, stop_readfd) + 1, &readfds, nullptr, nullptr, nullptr); { std::unique_lock<std::mutex> lck(dmtx_); @@ -317,30 +314,31 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() perror("Select Error"); std::this_thread::sleep_for( std::chrono::seconds(1) ); } - } - - if (data_coming > 0) { + } else if (data_coming > 0) { if (FD_ISSET(stop_readfd, &readfds)) { std::array<uint8_t, 64 * 1024> buf; recv(stop_readfd, (char*)buf.data(), buf.size(), 0); } - auto rcv = recvFrom(); - msgpack::object_handle oh = msgpack::unpack(reinterpret_cast<char*>(rcv.second.data()), rcv.second.size()); - msgpack::object obj = oh.get(); + try { + auto rcv = recvFrom(); + msgpack::object obj = rcv.second.get(); - if (obj.type != msgpack::type::MAP) - continue; - for (unsigned i = 0; i < obj.via.map.size; i++) { - auto& o = obj.via.map.ptr[i]; - if (o.key.type != msgpack::type::STR) + if (obj.type != msgpack::type::MAP) continue; - auto key = o.key.as<std::string>(); - std::unique_lock<std::mutex> lck(dmtx_); - auto callback = callbackmap_.find(key); - if (callback != callbackmap_.end()){ - callback->second(std::move(o.val), std::move(rcv.first)); + for (unsigned i = 0; i < obj.via.map.size; i++) { + auto& o = obj.via.map.ptr[i]; + if (o.key.type != msgpack::type::STR) + continue; + auto key = o.key.as<std::string>(); + std::lock_guard<std::mutex> lck(dmtx_); + auto callback = callbackmap_.find(key); + if (callback != callbackmap_.end()){ + callback->second(std::move(o.val), std::move(rcv.first)); + } } + } catch (const std::exception& e) { + std::cerr << "Error receiving packet: " << e.what() << std::endl; } } }