Commit 78515e37 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

jamiaccount: simplify forEachDevice

Change-Id: I97e818f48ccc0441826e4df152f95dbe17fb7ed6
parent 6af72289
......@@ -412,21 +412,21 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
// Find listening devices for this account
dht::InfoHash peer_account(toUri);
forEachDevice(peer_account, [wCall, toUri, peer_account](const std::shared_ptr<JamiAccount>& sthis, const dht::InfoHash& dev)
forEachDevice(peer_account, [this, wCall, toUri, peer_account](const dht::InfoHash& dev)
{
auto call = wCall.lock();
if (not call) return;
JAMI_DBG("[call %s] calling device %s", call->getCallId().c_str(), dev.toString().c_str());
auto& manager = Manager::instance();
auto dev_call = manager.callFactory.newCall<SIPCall, JamiAccount>(*sthis, manager.getNewCallID(),
auto dev_call = manager.callFactory.newCall<SIPCall, JamiAccount>(*this, manager.getNewCallID(),
Call::CallType::OUTGOING,
call->getDetails());
std::weak_ptr<SIPCall> weak_dev_call = dev_call;
dev_call->setIPToIP(true);
dev_call->setSecure(sthis->isTlsEnabled());
auto ice = sthis->createIceTransport(("sip:" + dev_call->getCallId()).c_str(),
ICE_COMPONENTS, true, sthis->getIceOptions());
dev_call->setSecure(isTlsEnabled());
auto ice = createIceTransport(("sip:" + dev_call->getCallId()).c_str(),
ICE_COMPONENTS, true, getIceOptions());
if (not ice) {
JAMI_WARN("Can't create ICE");
dev_call->removeCall();
......@@ -435,7 +435,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
call->addSubCall(*dev_call);
manager.addTask([sthis, weak_dev_call, ice, dev, toUri, peer_account] {
manager.addTask([sthis=shared(), weak_dev_call, ice, dev, toUri, peer_account] {
auto call = weak_dev_call.lock();
// call aborted?
......@@ -509,7 +509,7 @@ JamiAccount::startOutgoingCall(const std::shared_ptr<SIPCall>& call, const std::
sthis->checkPendingCallsTask();
return false;
});
}, [wCall](const std::shared_ptr<JamiAccount>&, bool ok){
}, [wCall](bool ok){
if (not ok) {
if (auto call = wCall.lock()) {
JAMI_WARN("[call:%s] no devices found", call->getCallId().c_str());
......@@ -1044,8 +1044,7 @@ generatePIN(size_t length = 8)
void
JamiAccount::addDevice(const std::string& password)
{
auto this_ = std::static_pointer_cast<JamiAccount>(shared_from_this());
ThreadPool::instance().run([this_,password]() {
ThreadPool::instance().run([this_=shared(), password]() {
std::vector<uint8_t> key;
dht::InfoHash loc;
std::string pin_str;
......@@ -1293,8 +1292,7 @@ JamiAccount::createAccount(const std::string& archive_password, dht::crypto::Ide
{
JAMI_WARN("[Account %s] creating new account", getAccountID().c_str());
setRegistrationState(RegistrationState::INITIALIZING);
auto sthis = std::static_pointer_cast<JamiAccount>(shared_from_this());
ThreadPool::instance().run([sthis,archive_password,migrate]() mutable {
ThreadPool::instance().run([sthis=shared(), archive_password, migrate]() mutable {
AccountArchive a;
auto& this_ = *sthis;
......@@ -2517,7 +2515,6 @@ JamiAccount::replyToIncomingIceMsg(const std::shared_ptr<SIPCall>& call,
registerDhtAddress(*ice);
// Asynchronous DHT put of our local ICE data
auto shared_this = std::static_pointer_cast<JamiAccount>(shared_from_this());
dht_.putEncrypted(
callKey_,
peer_ice_msg.from,
......@@ -2602,7 +2599,6 @@ JamiAccount::connectivityChanged()
return;
}
auto shared = std::static_pointer_cast<JamiAccount>(shared_from_this());
dht_.connectivityChanged();
}
......@@ -3191,10 +3187,10 @@ JamiAccount::sendTrustRequest(const std::string& to, const std::vector<uint8_t>&
return;
}
addContact(toH);
forEachDevice(toH, [toH,payload](const std::shared_ptr<JamiAccount>& shared, const dht::InfoHash& dev)
forEachDevice(toH, [this,toH,payload](const dht::InfoHash& dev)
{
JAMI_WARN("[Account %s] sending trust request to: %s / %s", shared->getAccountID().c_str(), toH.toString().c_str(), dev.toString().c_str());
shared->dht_.putEncrypted(dht::InfoHash::get("inbox:"+dev.toString()),
JAMI_WARN("[Account %s] sending trust request to: %s / %s", getAccountID().c_str(), toH.toString().c_str(), dev.toString().c_str());
dht_.putEncrypted(dht::InfoHash::get("inbox:"+dev.toString()),
dev,
dht::TrustRequest(DHT_TYPE_NS, payload));
});
......@@ -3205,10 +3201,10 @@ JamiAccount::sendTrustRequestConfirm(const dht::InfoHash& to)
{
dht::TrustRequest answer {DHT_TYPE_NS};
answer.confirm = true;
forEachDevice(to, [to,answer](const std::shared_ptr<JamiAccount>& shared, const dht::InfoHash& dev)
forEachDevice(to, [this,to,answer](const dht::InfoHash& dev)
{
JAMI_WARN("[Account %s] sending trust request reply: %s / %s", shared->getAccountID().c_str(), to.toString().c_str(), dev.toString().c_str());
shared->dht_.putEncrypted(dht::InfoHash::get("inbox:"+dev.toString()), dev, answer);
JAMI_WARN("[Account %s] sending trust request reply: %s / %s", getAccountID().c_str(), to.toString().c_str(), dev.toString().c_str());
dht_.putEncrypted(dht::InfoHash::get("inbox:"+dev.toString()), dev, answer);
});
}
......@@ -3323,10 +3319,8 @@ JamiAccount::igdChanged()
if (not dht_.isRunning())
return;
if (upnp_) {
auto shared = std::static_pointer_cast<JamiAccount>(shared_from_this());
std::thread{[shared] {
auto& this_ = *shared.get();
auto oldPort = static_cast<in_port_t>(this_.dhtPortUsed_);
std::thread{[s = shared(), oldPort = static_cast<in_port_t>(dhtPortUsed_)] {
auto& this_ = *s;
if (not this_.mapPortUPnP())
JAMI_WARN("UPnP: Could not map DHT port");
auto newPort = static_cast<in_port_t>(this_.dhtPortUsed_);
......@@ -3342,25 +3336,26 @@ JamiAccount::igdChanged()
void
JamiAccount::forEachDevice(const dht::InfoHash& to,
std::function<void(const std::shared_ptr<JamiAccount>&, const dht::InfoHash&)>&& op,
std::function<void(const std::shared_ptr<JamiAccount>&, bool)>&& end)
std::function<void(const dht::InfoHash&)>&& op,
std::function<void(bool)>&& end)
{
auto shared = std::static_pointer_cast<JamiAccount>(shared_from_this());
auto treatedDevices = std::make_shared<std::set<dht::InfoHash>>();
dht_.get<dht::crypto::RevocationList>(to, [to](dht::crypto::RevocationList&& crl){
tls::CertificateStore::instance().pinRevocationList(to.toString(), std::move(crl));
return true;
});
dht_.get<DeviceAnnouncement>(to, [shared,to,treatedDevices,op=std::move(op)](DeviceAnnouncement&& dev) {
dht_.get<DeviceAnnouncement>(to, [this,to,treatedDevices,op=std::move(op)](DeviceAnnouncement&& dev) {
if (dev.from != to)
return true;
if (treatedDevices->emplace(dev.dev).second)
op(shared, dev.dev);
if (treatedDevices->emplace(dev.dev).second) {
op(dev.dev);
}
return true;
}, [=, end=std::move(end)](bool /*ok*/){
JAMI_DBG("[Account %s] found %lu devices for %s",
getAccountID().c_str(), treatedDevices->size(), to.to_c_str());
if (end) end(shared, not treatedDevices->empty());
if (end)
end(not treatedDevices->empty());
});
}
......@@ -3410,71 +3405,65 @@ JamiAccount::sendTextMessage(const std::string& to, const std::map<std::string,
auto confirm = std::make_shared<PendingConfirmation>();
// Find listening devices for this account
forEachDevice(toH, [confirm,to,token,payloads,now](const std::shared_ptr<JamiAccount>& this_, const dht::InfoHash& dev)
forEachDevice(toH, [this,confirm,to,token,payloads,now](const dht::InfoHash& dev)
{
{
std::lock_guard<std::mutex> lock(this_->messageMutex_);
auto e = this_->sentMessages_.emplace(token, PendingMessage {});
std::lock_guard<std::mutex> lock(messageMutex_);
auto e = sentMessages_.emplace(token, PendingMessage {});
e.first->second.to = dev;
}
auto h = dht::InfoHash::get("inbox:"+dev.toString());
std::weak_ptr<JamiAccount> w = this_;
auto list_token = this_->dht_.listen<dht::ImMessage>(h, [to, w, token, confirm](dht::ImMessage&& msg) {
if (auto sthis = w.lock()) {
auto& this_ = *sthis;
// check expected message confirmation
if (msg.id != token)
return true;
{
std::lock_guard<std::mutex> lock(this_.messageMutex_);
auto e = this_.sentMessages_.find(msg.id);
if (e == this_.sentMessages_.end() or e->second.to != msg.from) {
JAMI_DBG() << "[Account " << this_.getAccountID() << "] [message " << token << "] Message not found";
return true;
}
this_.sentMessages_.erase(e);
JAMI_DBG() << "[Account " << this_.getAccountID() << "] [message " << token << "] Received text message reply";
auto list_token = dht_.listen<dht::ImMessage>(h, [this, to, token, confirm](dht::ImMessage&& msg) {
// check expected message confirmation
if (msg.id != token)
return true;
// add treated message
auto res = this_.treatedMessages_.insert(msg.id);
if (!res.second)
return true;
{
std::lock_guard<std::mutex> lock(messageMutex_);
auto e = sentMessages_.find(msg.id);
if (e == sentMessages_.end() or e->second.to != msg.from) {
JAMI_DBG() << "[Account " << getAccountID() << "] [message " << token << "] Message not found";
return true;
}
this_.saveTreatedMessages();
sentMessages_.erase(e);
JAMI_DBG() << "[Account " << getAccountID() << "] [message " << token << "] Received text message reply";
// report message as confirmed received
for (auto& t : confirm->listenTokens)
this_.dht_.cancelListen(t.first, t.second.get());
confirm->listenTokens.clear();
confirm->replied = true;
this_.messageEngine_.onMessageSent(to, token, true);
// add treated message
auto res = treatedMessages_.insert(msg.id);
if (!res.second)
return true;
}
saveTreatedMessages();
// report message as confirmed received
for (auto& t : confirm->listenTokens)
dht_.cancelListen(t.first, t.second.get());
confirm->listenTokens.clear();
confirm->replied = true;
messageEngine_.onMessageSent(to, token, true);
return false;
});
confirm->listenTokens.emplace(h, std::move(list_token));
this_->dht_.putEncrypted(h, dev,
dht_.putEncrypted(h, dev,
dht::ImMessage(token, std::string(payloads.begin()->first), std::string(payloads.begin()->second), now),
[w,to,token,confirm,h](bool ok) {
if (auto this_ = w.lock()) {
JAMI_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Put encrypted " << (ok ? "ok" : "failed");
if (not ok) {
auto lt = confirm->listenTokens.find(h);
if (lt != confirm->listenTokens.end()) {
this_->dht_.cancelListen(h, lt->second.get());
confirm->listenTokens.erase(lt);
}
if (confirm->listenTokens.empty() and not confirm->replied)
this_->messageEngine_.onMessageSent(to, token, false);
[this,to,token,confirm,h](bool ok) {
JAMI_DBG() << "[Account " << getAccountID() << "] [message " << token << "] Put encrypted " << (ok ? "ok" : "failed");
if (not ok) {
auto lt = confirm->listenTokens.find(h);
if (lt != confirm->listenTokens.end()) {
dht_.cancelListen(h, lt->second.get());
confirm->listenTokens.erase(lt);
}
if (confirm->listenTokens.empty() and not confirm->replied)
messageEngine_.onMessageSent(to, token, false);
}
});
JAMI_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Sending message for device " << dev.toString();
}, [to, token](const std::shared_ptr<JamiAccount>& shared, bool ok) {
JAMI_DBG() << "[Account " << getAccountID() << "] [message " << token << "] Sending message for device " << dev.toString();
}, [this, to, token](bool ok) {
if (not ok) {
shared->messageEngine_.onMessageSent(to, token, false);
messageEngine_.onMessageSent(to, token, false);
}
});
......
......@@ -365,8 +365,8 @@ class JamiAccount : public SIPAccountBase {
const std::shared_future<tls::DhParams> dhParams() const { return dhParams_; }
void forEachDevice(const dht::InfoHash& to,
std::function<void(const std::shared_ptr<JamiAccount>&, const dht::InfoHash&)>&& op,
std::function<void(const std::shared_ptr<JamiAccount>&, bool)>&& end = {});
std::function<void(const dht::InfoHash&)>&& op,
std::function<void(bool)>&& end = {});
/**
* Inform that a potential peer device have been found.
......
......@@ -765,23 +765,22 @@ DhtPeerConnector::requestConnection(const std::string& peer_id,
pimpl_->account.forEachDevice(
peer_h,
[this, addresses, connect_cb, tid](const std::shared_ptr<JamiAccount>& account,
const dht::InfoHash& dev_h) {
if (dev_h == account->dht().getId()) {
JAMI_ERR() << account << "[CNX] no connection to yourself, bad person!";
[this, addresses, connect_cb, tid](const dht::InfoHash& dev_h) {
if (dev_h == pimpl_->account.dht().getId()) {
JAMI_ERR() << pimpl_->account.getAccountID() << "[CNX] no connection to yourself, bad person!";
return;
}
account->findCertificate(
pimpl_->account.findCertificate(
dev_h,
[this, dev_h, addresses, connect_cb, tid] (const std::shared_ptr<dht::crypto::Certificate>& cert) {
pimpl_->ctrl << makeMsg<CtrlMsgType::ADD_DEVICE>(dev_h, tid, cert, addresses, connect_cb);
});
},
[peer_h, connect_cb](const std::shared_ptr<JamiAccount>& account, bool found) {
[this, peer_h, connect_cb](bool found) {
if (!found) {
JAMI_WARN() << account << "[CNX] aborted, no devices for " << peer_h;
JAMI_WARN() << pimpl_->account.getAccountID() << "[CNX] aborted, no devices for " << peer_h;
connect_cb(nullptr);
}
});
......@@ -802,18 +801,16 @@ DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataT
pimpl_->ctrl << makeMsg<CtrlMsgType::CANCEL>(peer_h, tid);
pimpl_->account.forEachDevice(
peer_h,
[this, tid](const std::shared_ptr<JamiAccount>& account,
const dht::InfoHash& dev_h) {
if (dev_h == account->dht().getId()) {
JAMI_ERR() << account << "[CNX] no connection to yourself, bad person!";
[this, tid](const dht::InfoHash& dev_h) {
if (dev_h == pimpl_->account.dht().getId()) {
JAMI_ERR() << pimpl_->account.getAccountID() << "[CNX] no connection to yourself, bad person!";
return;
}
pimpl_->ctrl << makeMsg<CtrlMsgType::CANCEL>(dev_h, tid);
},
[peer_h](const std::shared_ptr<JamiAccount>& account, bool found) {
[this, peer_h](bool found) {
if (!found) {
JAMI_WARN() << account << "[CNX] aborted, no devices for " << peer_h;
JAMI_WARN() << pimpl_->account.getAccountID() << "[CNX] aborted, no devices for " << peer_h;
}
});
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment