Skip to content
Snippets Groups Projects
Commit 01bae956 authored by Andreas Traczyk's avatar Andreas Traczyk
Browse files

misc: clang-format src/jamidht/conversation.cpp

Change-Id: I0cf8e6a90c6852db6b65ef9d8e7681a237e67431
parent 2b14e50f
No related branches found
No related tags found
No related merge requests found
......@@ -187,22 +187,24 @@ public:
if (auto shared = account_.lock()) {
ioContext_ = Manager::instance().ioContext();
fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
swarmManager_ = std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId()),
Manager::instance().getSeededRandomEngine(),
[account=account_](const DeviceId& deviceId) {
if (auto acc = account.lock()) {
return acc->isConnectedWith(deviceId);
}
return false;
});
swarmManager_
= std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId()),
Manager::instance().getSeededRandomEngine(),
[account = account_](const DeviceId& deviceId) {
if (auto acc = account.lock()) {
return acc->isConnectedWith(deviceId);
}
return false;
});
swarmManager_->setMobility(shared->isMobile());
accountId_ = shared->getAccountID();
transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(),
"",
repository_->id(),
Manager::instance().getSeededRandomEngine());
transferManager_
= std::make_shared<TransferManager>(shared->getAccountID(),
"",
repository_->id(),
Manager::instance().getSeededRandomEngine());
conversationDataPath_ = fileutils::get_data_dir() / shared->getAccountID()
/ "conversation_data" / repository_->id();
/ "conversation_data" / repository_->id();
fetchedPath_ = conversationDataPath_ / "fetched";
sendingPath_ = conversationDataPath_ / "sending";
lastDisplayedPath_ = conversationDataPath_ / ConversationMapKeys::LAST_DISPLAYED;
......@@ -629,7 +631,8 @@ public:
std::weak_ptr<JamiAccount> account_;
std::atomic_bool isRemoving_ {false};
std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options, History* optHistory = nullptr);
std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
History* optHistory = nullptr);
void pull();
std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
......@@ -675,9 +678,10 @@ public:
* Loaded history represents the linearized history to show for clients
*/
mutable History loadedHistory_ {};
std::vector<libjami::SwarmMessage> addToHistory(const std::vector<std::map<std::string, std::string>>& commits,
bool messageReceived = false,
History* history = nullptr) const;
std::vector<libjami::SwarmMessage> addToHistory(
const std::vector<std::map<std::string, std::string>>& commits,
bool messageReceived = false,
History* history = nullptr) const;
// While loading the history, we need to avoid:
// - reloading history (can just be ignored)
// - adding new commits (should wait for history to be loaded)
......@@ -685,9 +689,14 @@ public:
mutable std::mutex historyMtx_ {};
mutable std::condition_variable historyCv_ {};
void handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
void handleEdition(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit, bool messageReceived) const;
bool handleMessage(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit, bool messageReceived) const;
void handleReaction(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
void handleEdition(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
bool messageReceived) const;
bool handleMessage(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
bool messageReceived) const;
};
bool
......@@ -787,7 +796,8 @@ Conversation::Impl::loadMessages(const LogOptions& options)
std::vector<ConversationCommit> commits;
auto startLogging = options.from == "";
auto breakLogging = false;
repository_->log([&](const auto& id, const auto& author, const auto& commit) {
repository_->log(
[&](const auto& id, const auto& author, const auto& commit) {
if (!commits.empty()) {
// Set linearized parent
commits.rbegin()->linearized_parent = id;
......@@ -795,8 +805,7 @@ Conversation::Impl::loadMessages(const LogOptions& options)
if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
return CallbackResult::Skip;
}
if ((options.nbOfCommits != 0
&& commits.size() == options.nbOfCommits))
if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
return CallbackResult::Break; // Stop logging
if (breakLogging)
return CallbackResult::Break; // Stop logging
......@@ -825,9 +834,7 @@ Conversation::Impl::loadMessages(const LogOptions& options)
return CallbackResult::Ok; // Continue
},
[&](auto&& cc) {
commits.emplace(commits.end(), std::forward<decltype(cc)>(cc));
},
[&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
[](auto, auto, auto) { return false; },
options.from,
options.logIfNotFound);
......@@ -850,14 +857,17 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory
auto currentHistorySize = loadedHistory_.messageList.size();
std::vector<std::string> replies;
std::vector<libjami::SwarmMessage> ret;
repository_->log([&](const auto& id, const auto& author, const auto& commit) {
repository_->log(
[&](const auto& id, const auto& author, const auto& commit) {
if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
return CallbackResult::Skip;
}
if (replies.empty()) { // This avoid load until
// NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load until this commit
// NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
// until this commit
if ((options.nbOfCommits != 0
&& (loadedHistory_.messageList.size() - currentHistorySize) == options.nbOfCommits))
&& (loadedHistory_.messageList.size() - currentHistorySize)
== options.nbOfCommits))
return CallbackResult::Break; // Stop logging
if (breakLogging)
return CallbackResult::Break; // Stop logging
......@@ -910,7 +920,8 @@ Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory
}
void
Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
Conversation::Impl::handleReaction(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
{
auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
auto peditIt = history.pendingEditions.find(sharedCommit->id);
......@@ -923,14 +934,19 @@ Conversation::Impl::handleReaction(History& history, const std::shared_ptr<libja
}
if (it != history.quickAccess.end()) {
it->second->reactions.emplace_back(sharedCommit->body);
emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_, repository_->id(), it->second->id, sharedCommit->body);
emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
repository_->id(),
it->second->id,
sharedCommit->body);
} else {
history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
}
}
void
Conversation::Impl::handleEdition(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit, bool messageReceived) const
Conversation::Impl::handleEdition(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
bool messageReceived) const
{
auto editId = sharedCommit->body.at("edit");
auto it = history.quickAccess.find(editId);
......@@ -946,21 +962,29 @@ Conversation::Impl::handleEdition(History& history, const std::shared_ptr<libjam
auto itPending = history.pendingReactions.find(itReact->second);
if (it != history.quickAccess.end()) {
baseCommit = it->second; // Base commit
auto itPreviousReact = std::find_if(baseCommit->reactions.begin(), baseCommit->reactions.end(), [&](const auto& reaction) {
return reaction.at("id") == editId;
});
auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
baseCommit->reactions.end(),
[&](const auto& reaction) {
return reaction.at("id") == editId;
});
if (itPreviousReact != baseCommit->reactions.end()) {
(*itPreviousReact)["body"] = body;
if (body.empty()) {
baseCommit->reactions.erase(itPreviousReact);
emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_, repository_->id(), baseCommit->id, editId);
emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
repository_
->id(),
baseCommit->id,
editId);
}
}
} else if (itPending != history.pendingReactions.end()) {
// Else edit if pending
auto itReaction = std::find_if(itPending->second.begin(), itPending->second.end(), [&](const auto& reaction) {
return reaction.at("id") == editId;
});
auto itReaction = std::find_if(itPending->second.begin(),
itPending->second.end(),
[&](const auto& reaction) {
return reaction.at("id") == editId;
});
if (itReaction != itPending->second.end()) {
(*itReaction)["body"] = body;
if (body.empty())
......@@ -968,24 +992,28 @@ Conversation::Impl::handleEdition(History& history, const std::shared_ptr<libjam
}
} else {
// Add to pending edtions
messageReceived? history.pendingEditions[editId].emplace_front(sharedCommit)
messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
: history.pendingEditions[editId].emplace_back(sharedCommit);
}
} else {
// Normal message
it->second->editions.emplace(it->second->editions.begin(), it->second->body);
it->second->body["body"] = sharedCommit->body["body"];
emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
repository_->id(),
*it->second);
}
}
} else {
messageReceived? history.pendingEditions[editId].emplace_front(sharedCommit)
: history.pendingEditions[editId].emplace_back(sharedCommit);
messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
: history.pendingEditions[editId].emplace_back(sharedCommit);
}
}
bool
Conversation::Impl::handleMessage(History& history, const std::shared_ptr<libjami::SwarmMessage>& sharedCommit, bool messageReceived) const
Conversation::Impl::handleMessage(History& history,
const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
bool messageReceived) const
{
if (messageReceived) {
// For a received message, we place it at the beginning of the list
......@@ -1002,7 +1030,7 @@ Conversation::Impl::handleMessage(History& history, const std::shared_ptr<libjam
// Handle pending reactions/editions
auto reactIt = history.pendingReactions.find(sharedCommit->id);
if (reactIt != history.pendingReactions.end()) {
for (const auto& commitBody: reactIt->second)
for (const auto& commitBody : reactIt->second)
sharedCommit->reactions.emplace_back(commitBody);
history.pendingReactions.erase(reactIt);
}
......@@ -1011,7 +1039,7 @@ Conversation::Impl::handleMessage(History& history, const std::shared_ptr<libjam
auto oldBody = sharedCommit->body;
sharedCommit->body["body"] = peditIt->second.front()->body["body"];
peditIt->second.pop_front();
for (const auto& commit: peditIt->second) {
for (const auto& commit : peditIt->second) {
sharedCommit->editions.emplace_back(commit->body);
}
sharedCommit->editions.emplace_back(oldBody);
......@@ -1019,12 +1047,16 @@ Conversation::Impl::handleMessage(History& history, const std::shared_ptr<libjam
}
// Announce to client
if (messageReceived)
emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_, repository_->id(), *sharedCommit);
emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
repository_->id(),
*sharedCommit);
return !messageReceived;
}
std::vector<libjami::SwarmMessage>
Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::string>>& commits, bool messageReceived, History* optHistory) const
Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::string>>& commits,
bool messageReceived,
History* optHistory) const
{
if (messageReceived && (!optHistory && isLoadingHistory_)) {
std::unique_lock<std::mutex> lk(historyMtx_);
......@@ -1150,7 +1182,8 @@ Conversation::gitSocket(const DeviceId& deviceId) const
}
void
Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
Conversation::addGitSocket(const DeviceId& deviceId,
const std::shared_ptr<dhtnet::ChannelSocket>& socket)
{
pimpl_->addGitSocket(deviceId, socket);
}
......@@ -1303,7 +1336,7 @@ Conversation::removeMember(const std::string& contactUri, bool isDevice, const O
for (const auto& [deviceId, _] : sthis->pimpl_->gitSocketList_)
if (contactUri == sthis->uriFromDevice(deviceId.toString()))
gitToRm.emplace_back(deviceId);
for (const auto& did: gitToRm)
for (const auto& did : gitToRm)
sthis->removeGitSocket(did);
}
......@@ -1698,7 +1731,7 @@ Conversation::sync(const std::string& member,
{
JAMI_INFO() << "Sync " << id() << " with " << deviceId;
pull(deviceId, std::move(cb), commitId);
dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w=weak_from_this()]{
dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
auto sthis = w.lock();
if (auto account = a.lock()) {
// For waiting request, downloadFile
......@@ -1838,8 +1871,7 @@ Conversation::preferences(bool includeLastModified) const
msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
oh.get().convert(preferences);
if (includeLastModified)
preferences[LAST_MODIFIED] =
std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
return preferences;
} catch (const std::exception& e) {
}
......@@ -1931,29 +1963,30 @@ Conversation::downloadFile(const std::string& interactionId,
return false;
}
auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t)-1);
auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
if (totalSize < 0) {
JAMI_ERROR("Invalid file size {}", totalSize);
return false;
}
// Be sure to not lock conversation
dht::ThreadPool().io().run(
[w = weak(),
deviceId,
fileId, interactionId, sha3sum=sha3sum->second, path, totalSize, start, end] {
if (auto shared = w.lock()) {
auto acc = shared->pimpl_->account_.lock();
if (!acc)
return;
shared->dataTransfer()->waitForTransfer(fileId,
interactionId,
sha3sum,
path,
totalSize);
acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
}
});
dht::ThreadPool().io().run([w = weak(),
deviceId,
fileId,
interactionId,
sha3sum = sha3sum->second,
path,
totalSize,
start,
end] {
if (auto shared = w.lock()) {
auto acc = shared->pimpl_->account_.lock();
if (!acc)
return;
shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
}
});
return true;
}
......@@ -2102,12 +2135,11 @@ Conversation::displayed() const
{
try {
std::map<std::string, std::string> lastDisplayed;
auto filePath = pimpl_->conversationDataPath_/ ConversationMapKeys::LAST_DISPLAYED;
auto filePath = pimpl_->conversationDataPath_ / ConversationMapKeys::LAST_DISPLAYED;
auto file = fileutils::loadFile(filePath);
msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
oh.get().convert(lastDisplayed);
lastDisplayed[LAST_MODIFIED] =
std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
lastDisplayed[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
return lastDisplayed;
} catch (const std::exception& e) {
}
......@@ -2128,8 +2160,7 @@ Conversation::checkBootstrapMember(const asio::error_code& ec,
{
auto acc = pimpl_->account_.lock();
if (ec == asio::error::operation_aborted
or pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0
or not acc)
or pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
return;
// We bootstrap the DRT with devices who already wrote in the repository.
// However, in a conversation, a large number of devices may just watch
......@@ -2194,7 +2225,8 @@ Conversation::checkBootstrapMember(const asio::error_code& ec,
}
void
Conversation::bootstrap(std::function<void()> onBootstraped, const std::vector<DeviceId>& knownDevices)
Conversation::bootstrap(std::function<void()> onBootstraped,
const std::vector<DeviceId>& knownDevices)
{
if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
return;
......@@ -2259,7 +2291,7 @@ Conversation::commitsEndedCalls()
auto commits = pimpl_->commitsEndedCalls();
if (!commits.empty()) {
// Announce to client
dht::ThreadPool::io().run([w = weak(), commits]{
dht::ThreadPool::io().run([w = weak(), commits] {
if (auto sthis = w.lock())
sthis->pimpl_->announce(commits);
});
......@@ -2301,7 +2333,7 @@ Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
return;
auto member = cert->issuer->getId().toString();
pimpl_->swarmManager_->addChannel(std::move(channel));
dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w=weak_from_this()]{
dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
auto sthis = w.lock();
if (auto account = a.lock()) {
account->sendProfile(sthis->id(), member, deviceId.toString());
......@@ -2342,10 +2374,12 @@ Conversation::search(uint32_t req,
std::vector<std::map<std::string, std::string>> commits {};
// std::regex_constants::ECMAScript is the default flag.
auto re = std::regex(filter.regexSearch,
filter.caseSensitive ? std::regex_constants::ECMAScript
: std::regex_constants::icase);
sthis->pimpl_->repository_->log([&](const auto& id, const auto& author, auto& commit) {
if (!filter.author.empty() && filter.author != sthis->uriFromDevice(author.email)) {
filter.caseSensitive ? std::regex_constants::ECMAScript
: std::regex_constants::icase);
sthis->pimpl_->repository_->log(
[&](const auto& id, const auto& author, auto& commit) {
if (!filter.author.empty()
&& filter.author != sthis->uriFromDevice(author.email)) {
// Filter author
return CallbackResult::Skip;
}
......@@ -2360,13 +2394,15 @@ Conversation::search(uint32_t req,
return CallbackResult::Break;
else
return CallbackResult::Skip; // Because we are sorting it with
// GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
// GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
}
return CallbackResult::Ok; // Continue
},
[&](auto&& cc) {
sthis->pimpl_->addToHistory({*sthis->pimpl_->repository_->convCommitToMap(cc)}, false, &history);
sthis->pimpl_->addToHistory({*sthis->pimpl_->repository_->convCommitToMap(cc)},
false,
&history);
},
[&](auto id, auto, auto) {
if (id == filter.lastId)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment