diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 45e7d9cc8098bf4c8f9e95e9555551328fd4d71c..815fb1c1ae2ecaa31f8f28f8378a7b465f73750c 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -105,9 +105,7 @@ struct Prefix { using Value = std::pair<InfoHash, dht::Value::Id>; struct IndexEntry : public dht::Value::Serializable<IndexEntry> { - const ValueType& getType() const { - return ValueType::USER_DATA; - } + static const ValueType TYPE; virtual void unpackValue(const dht::Value& v) { Serializable<IndexEntry>::unpackValue(v); diff --git a/python/opendht.pyx b/python/opendht.pyx index 101e53debcc496cbb342069ef83ad2322ac85e4f..aaad403ba7ddabbc95860a39221ff3270f7317e1 100644 --- a/python/opendht.pyx +++ b/python/opendht.pyx @@ -41,6 +41,17 @@ cimport opendht_cpp as cpp import threading +cdef inline void lookup_callback(cpp.vector[cpp.shared_ptr[cpp.IndexValue]]* values, cpp.Prefix* p, void *user_data) with gil: + cbs = <object>user_data + if 'lookup' in cbs and cbs['lookup']: + vals = [] + for val in deref(values): + v = IndexValue() + v._value = val + vals.append(v) + cbs['lookup'](vals, p.toString()) + ref.Py_DECREF(cbs) + cdef inline void shutdown_callback(void* user_data) with gil: cbs = <object>user_data if 'shutdown' in cbs and cbs['shutdown']: @@ -65,6 +76,12 @@ cdef inline void done_callback(bool done, cpp.vector[shared_ptr[cpp.Node]]* node cbs['done'](done, node_ids) ref.Py_DECREF(cbs) +cdef inline void done_callback_simple(bool done, void *user_data) with gil: + cbs = <object>user_data + if 'done' in cbs and cbs['done']: + cbs['done'](done) + ref.Py_DECREF(cbs) + cdef class _WithID(object): def __repr__(self): return "<%s '%s'>" % (self.__class__.__name__, str(self)) @@ -73,7 +90,7 @@ cdef class _WithID(object): cdef class InfoHash(_WithID): cdef cpp.InfoHash _infohash - def __init__(self, bytes str=b''): + def __cinit__(self, bytes str=b''): self._infohash = cpp.InfoHash(str) def __bool__(InfoHash self): return not (self._infohash == cpp.InfoHash()) @@ -255,28 +272,28 @@ cdef class DhtConfig(object): self._config.dht_config.node_config.node_id = id._infohash cdef class DhtRunner(_WithID): - cdef cpp.DhtRunner* thisptr + cdef cpp.shared_ptr[cpp.DhtRunner] thisptr def __cinit__(self): - self.thisptr = new cpp.DhtRunner() + self.thisptr.reset(new cpp.DhtRunner()) def getId(self): h = InfoHash() - h._infohash = self.thisptr.getId() + h._infohash = self.thisptr.get().getId() return h def getNodeId(self): - return self.thisptr.getNodeId().toString() + return self.thisptr.get().getNodeId().toString() def bootstrap(self, str host, str port): - self.thisptr.bootstrap(host.encode(), port.encode()) + self.thisptr.get().bootstrap(host.encode(), port.encode()) def run(self, Identity id=None, is_bootstrap=False, cpp.in_port_t port=0, str ipv4="", str ipv6="", DhtConfig config=DhtConfig()): if id: config.setIdentity(id) if ipv4 or ipv6: bind4 = ipv4.encode() if ipv4 else b'' bind6 = ipv6.encode() if ipv6 else b'' - self.thisptr.run(bind4, bind6, str(port).encode(), config._config) + self.thisptr.get().run(bind4, bind6, str(port).encode(), config._config) else: - self.thisptr.run(port, config._config) + self.thisptr.get().run(port, config._config) def join(self): - self.thisptr.join() + self.thisptr.get().join() def shutdown(self, shutdown_cb=None): cb_obj = {'shutdown':shutdown_cb} ref.Py_INCREF(cb_obj) @@ -288,16 +305,16 @@ cdef class DhtRunner(_WithID): def enableFileLogging(self, str path): cpp.enableFileLogging(self.thisptr[0], path) def isRunning(self): - return self.thisptr.isRunning() + return self.thisptr.get().isRunning() def getStorageLog(self): - return self.thisptr.getStorageLog().decode() + return self.thisptr.get().getStorageLog().decode() def getRoutingTablesLog(self, cpp.sa_family_t af): - return self.thisptr.getRoutingTablesLog(af).decode() + return self.thisptr.get().getRoutingTablesLog(af).decode() def getSearchesLog(self, cpp.sa_family_t af): - return self.thisptr.getSearchesLog(af).decode() + return self.thisptr.get().getSearchesLog(af).decode() def getNodeMessageStats(self): stats = [] - cdef cpp.vector[unsigned] res = self.thisptr.getNodeMessageStats(False) + cdef cpp.vector[unsigned] res = self.thisptr.get().getNodeMessageStats(False) for n in res: stats.append(n) return stats @@ -305,9 +322,11 @@ cdef class DhtRunner(_WithID): def get(self, InfoHash key, get_cb=None, done_cb=None): """Retreive values associated with a key on the DHT. - key -- the key for which to search - get_cb -- is set, makes the operation non-blocking. Called when a value is found on the DHT. - done_cb -- optional callback used when get_cb is set. Called when the operation is completed. + key -- the key for which to search + get_cb -- is set, makes the operation non-blocking. Called when a value + is found on the DHT. + done_cb -- optional callback used when get_cb is set. Called when the + operation is completed. """ if get_cb: cb_obj = {'get':get_cb, 'done':done_cb} @@ -335,8 +354,8 @@ cdef class DhtRunner(_WithID): def put(self, InfoHash key, Value val, done_cb=None): """Publish a new value on the DHT at key. - key -- the DHT key where to put the value - val -- the value to put on the DHT + key -- the DHT key where to put the value + val -- the value to put on the DHT done_cb -- optional callback called when the operation is completed. """ if done_cb: @@ -369,6 +388,62 @@ cdef class DhtRunner(_WithID): t._t = self.thisptr.listen(t._h, cpp.bindGetCb(get_callback, <void*>cb_obj)).share() return t def cancelListen(self, ListenToken token): - self.thisptr.cancelListen(token._h, token._t) + self.thisptr.get().cancelListen(token._h, token._t) ref.Py_DECREF(<object>token._cb['cb']) # fixme: not thread safe + +cdef class IndexValue(object): + cdef cpp.shared_ptr[cpp.IndexValue] _value + def __init__(self, InfoHash h, cpp.uint64_t vid=0): + cdef cpp.InfoHash hh = h._infohash + self._value.reset(new cpp.IndexValue(hh, vid)) + def getKey(self): + h = InfoHash() + h._infohash = self._value.get().first + return h + def getValueId(self): + return self._value.get().second + +cdef class Pht(object): + cdef cpp.Pht* thisptr + def __cinit__(self, bytes name, DhtRunner dht): + self.thisptr = new cpp.Pht(name, dht.thisptr) + def lookup(self, key, lookup_cb=None, done_cb=None): + """Query the Index with a specified key. + + key -- the key for to the entry in the index. + lookup_cb -- function called when the operation is completed. This + function takes a list of IndexValue objects and a string + representation of the prefix where the value was indexed in + the PHT. + """ + cb_obj = {'lookup':lookup_cb, 'done':done_cb} # TODO: donecallback is to be removed + ref.Py_INCREF(cb_obj) + cdef cpp.IndexKey cppk + for kk, v in key.items(): + cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v)) + self.thisptr.lookup( + cppk, + cpp.Pht.bindLookupCb(lookup_callback, <void*>cb_obj), + cpp.Dht.bindDoneCbSimple(done_callback_simple, <void*>cb_obj) + ) + def insert(self, key, IndexValue value, done_cb=None): + """Add an index entry to the Index. + + key -- the key for to the entry in the index. + value -- an IndexValue object describing the indexed value. + done_cb -- Called when the operation is completed. + """ + cb_obj = {'done':done_cb} + cdef cpp.IndexKey cppk + for kk, v in key.items(): + cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v)) + cdef cpp.IndexValue val + val.first = (<InfoHash>value.getKey())._infohash + val.second = value.getValueId() + self.thisptr.insert( + cppk, + val, + cpp.Dht.bindDoneCbSimple(done_callback_simple, <void*>cb_obj) + ) + diff --git a/python/opendht_cpp.pxd b/python/opendht_cpp.pxd index 32056eea77cc1e4fa3921de46a9e377968ebeee9..8f83c17502b2ab0b03cc52e58a6c4d201da698f9 100644 --- a/python/opendht_cpp.pxd +++ b/python/opendht_cpp.pxd @@ -20,11 +20,19 @@ from libcpp cimport bool from libcpp.string cimport string from libcpp.vector cimport vector from libcpp.utility cimport pair -from libcpp.memory cimport shared_ptr +from libcpp.map cimport map ctypedef uint16_t in_port_t ctypedef unsigned short int sa_family_t; +cdef extern from "<memory>" namespace "std" nogil: + cdef cppclass shared_ptr[T]: + shared_ptr() except + + shared_ptr(T*) except + + T* get() + T operator*() + void reset(T*) + cdef extern from "<functional>" namespace "std" nogil: cdef cppclass hash[T]: hash() except + @@ -150,3 +158,20 @@ cdef extern from "opendht/log.h" namespace "dht::log": void disableLogging(DhtRunner& dht) void enableFileLogging(DhtRunner& dht, const string& path) +cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation": + cdef cppclass Prefix: + Prefix() except + + Prefix(vector[uint8_t]) except + + string toString() const + ctypedef pair[InfoHash, uint64_t] IndexValue "dht::indexation::Value" + ctypedef map[string, Prefix] IndexKey "dht::indexation::Pht::Key" + ctypedef void (*LookupCallbackRaw)(vector[shared_ptr[IndexValue]]* values, Prefix* p, void* user_data); + cdef cppclass Pht: + cppclass LookupCallback: + LookupCallback() except + + Pht(string, shared_ptr[DhtRunner]) except + + void lookup(IndexKey k, LookupCallback cb, Dht.DoneCallbackSimple doneCb); + void insert(IndexKey k, IndexValue v, Dht.DoneCallbackSimple cb) + @staticmethod + LookupCallback bindLookupCb(LookupCallbackRaw cb, void *user_data) + diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 0a66f681bce71f02c2562a50307dae2192775b2d..1480b25fe3964cdf31f366dc8018767cf1eef32d 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -38,10 +38,12 @@ void print_id_req() { std::cout << "An identity is required to perform this operation (run with -i)" << std::endl; } -void print_node_info(const DhtRunner& dht, const dht_params& params) { - std::cout << "OpenDht node " << dht.getNodeId() << " running on port " << dht.getBoundPort() << std::endl; +void print_node_info(const std::shared_ptr<DhtRunner>& dht, const dht_params& params) { + std::cout << "OpenDht node " << dht->getNodeId() << " running on port " << dht->getBoundPort() << std::endl; + if (params.is_bootstrap_node) + std::cout << "Running in bootstrap mode (discouraged)." << std::endl; if (params.generate_identity) - std::cout << "Public key ID " << dht.getId() << std::endl; + std::cout << "Public key ID " << dht->getId() << std::endl; } void print_help() { @@ -64,11 +66,14 @@ void print_help() { << " l [key] [where] Listen for value changes at [key]. [where] is the 'where' part of an SQL-ish string." << std::endl << " p [key] [str] Put string value at [key]." << std::endl << " s [key] [str] Put string value at [key], signed with our generated private key." << std::endl - << " e [key] [dest] [str] Put string value at [key], encrypted for [dest] with its public key (if found)." << std::endl + << " e [key] [dest] [str] Put string value at [key], encrypted for [dest] with its public key (if found)." << std::endl; + std::cout << std::endl << "Indexation operations on the DHT:" << std::endl + << " il [name] [key] Lookup the index named [name] with the key [key]." << std::endl + << " ii [name] [key] [value] Inserts the value [value] under the key [key] in the index named [name]." << std::endl << std::endl; } -void cmd_loop(DhtRunner& dht, dht_params& params) +void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexation::Pht> indexes, dht_params& params) { print_node_info(dht, params); std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl << std::endl; @@ -84,8 +89,8 @@ void cmd_loop(DhtRunner& dht, dht_params& params) break; std::istringstream iss(line); - std::string op, idstr, value; - iss >> op >> idstr; + std::string op, idstr, value, index, keystr; + iss >> op; if (op == "x" || op == "exit" || op == "quit") { break; @@ -96,31 +101,32 @@ void cmd_loop(DhtRunner& dht, dht_params& params) print_node_info(dht, params); unsigned good4, dubious4, cached4, incoming4; unsigned good6, dubious6, cached6, incoming6; - dht.getNodesStats(AF_INET, &good4, &dubious4, &cached4, &incoming4); - dht.getNodesStats(AF_INET6, &good6, &dubious6, &cached6, &incoming6); + dht->getNodesStats(AF_INET, &good4, &dubious4, &cached4, &incoming4); + dht->getNodesStats(AF_INET6, &good6, &dubious6, &cached6, &incoming6); std::cout << "IPv4 nodes : " << good4 << " good, " << dubious4 << " dubious, " << incoming4 << " incoming." << std::endl; std::cout << "IPv6 nodes : " << good6 << " good, " << dubious6 << " dubious, " << incoming6 << " incoming." << std::endl; continue; } else if (op == "lr") { std::cout << "IPv4 routing table:" << std::endl; - std::cout << dht.getRoutingTablesLog(AF_INET) << std::endl; + std::cout << dht->getRoutingTablesLog(AF_INET) << std::endl; std::cout << "IPv6 routing table:" << std::endl; - std::cout << dht.getRoutingTablesLog(AF_INET6) << std::endl; + std::cout << dht->getRoutingTablesLog(AF_INET6) << std::endl; continue; } else if (op == "ld") { - std::cout << dht.getStorageLog() << std::endl; + std::cout << dht->getStorageLog() << std::endl; continue; } else if (op == "ls") { std::cout << "Searches:" << std::endl; - std::cout << dht.getSearchesLog() << std::endl; + std::cout << dht->getSearchesLog() << std::endl; continue; } else if (op == "la") { std::cout << "Reported public addresses:" << std::endl; - auto addrs = dht.getPublicAddressStr(); + auto addrs = dht->getPublicAddressStr(); for (const auto& addr : addrs) std::cout << addr << std::endl; continue; } else if (op == "b") { + iss >> idstr; try { auto addr = splitPort(idstr); if (not addr.first.empty() and addr.second.empty()){ @@ -128,7 +134,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params) ss << DHT_DEFAULT_PORT; addr.second = ss.str(); } - dht.bootstrap(addr.first.c_str(), addr.second.c_str()); + dht->bootstrap(addr.first.c_str(), addr.second.c_str()); } catch (const std::exception& e) { std::cerr << e.what() << std::endl; } @@ -145,24 +151,36 @@ void cmd_loop(DhtRunner& dht, dht_params& params) if (op.empty()) continue; - dht::InfoHash id {idstr}; - static const std::set<std::string> VALID_OPS {"g", "q", "l", "p", "s", "e", "a"}; + static const std::set<std::string> VALID_OPS {"g", "l", "il", "ii", "p", "s", "e", "a"}; if (VALID_OPS.find(op) == VALID_OPS.cend()) { std::cout << "Unknown command: " << op << std::endl; std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl; continue; } - static constexpr dht::InfoHash INVALID_ID {}; - if (id == INVALID_ID) { - std::cout << "Syntax error: invalid InfoHash." << std::endl; - continue; + + if (op == "il" or op == "ii") { + // Pht syntax + iss >> index >> keystr; + if (not index.size()) { + std::cout << "You must enter the index name." << std::endl; + continue; + } else { + indexes.emplace(index, dht::indexation::Pht {index, dht}); + } + } + else { + // Dht syntax + iss >> idstr; + InfoHash h {idstr}; + if (not isInfoHash(h)) + continue; } + dht::InfoHash id {idstr}; + // Dht auto start = std::chrono::high_resolution_clock::now(); if (op == "g") { - std::string rem; - std::getline(iss, rem); - dht.get(id, [start](std::shared_ptr<Value> value) { + dht->get(id, [start](std::shared_ptr<Value> value) { auto now = std::chrono::high_resolution_clock::now(); std::cout << "Get: found value (after " << print_dt(now-start) << "s)" << std::endl; std::cout << "\t" << *value << std::endl; @@ -199,7 +217,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params) else if (op == "p") { std::string v; iss >> v; - dht.put(id, dht::Value { + dht->put(id, dht::Value { dht::ValueType::USER_DATA.id, std::vector<uint8_t> {v.begin(), v.end()} }, [start](bool ok) { @@ -214,7 +232,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params) } std::string v; iss >> v; - dht.putSigned(id, dht::Value { + dht->putSigned(id, dht::Value { dht::ValueType::USER_DATA.id, std::vector<uint8_t> {v.begin(), v.end()} }, [start](bool ok) { @@ -230,7 +248,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params) std::string tostr; std::string v; iss >> tostr >> v; - dht.putEncrypted(id, InfoHash(tostr), dht::Value { + dht->putEncrypted(id, InfoHash(tostr), dht::Value { dht::ValueType::USER_DATA.id, std::vector<uint8_t> {v.begin(), v.end()} }, [start](bool ok) { @@ -241,11 +259,45 @@ void cmd_loop(DhtRunner& dht, dht_params& params) else if (op == "a") { in_port_t port; iss >> port; - dht.put(id, dht::Value {dht::IpServiceAnnouncement::TYPE.id, dht::IpServiceAnnouncement(port)}, [start](bool ok) { + dht->put(id, dht::Value {dht::IpServiceAnnouncement::TYPE.id, dht::IpServiceAnnouncement(port)}, [start](bool ok) { auto end = std::chrono::high_resolution_clock::now(); std::cout << "Announce: " << (ok ? "success" : "failure") << " (took " << print_dt(end-start) << "s)" << std::endl; }); } + else if (op == "il") { + indexes.at(index).lookup(createPhtKey(parseStringMap(keystr)), + [=](std::vector<std::shared_ptr<indexation::Value>>& vals, indexation::Prefix p) { + std::cout << "Pht::lookup: at prefix \"" << p.toString() << "\"" << ", hash is " << p.hash() << std::endl; + for (auto v : vals) { + std::cout << "Pht::lookup: found hash" << std::endl + << "\t" << v->first << "[vid: " << v->second << "]" << std::endl; + } + std::cout << "Pht::lookup: done." << std::endl; + }, + [](bool ok) { + if (not ok) { + std::cout << "Pht::lookup: dht Get failed." << std::endl; + } + } + ); + } + else if (op == "ii") { + iss >> idstr; + InfoHash h {idstr}; + if (not isInfoHash(h)) + continue; + + indexation::Value v {h, 0}; + indexes.at(index).insert(createPhtKey(parseStringMap(keystr)), v, + [=](bool success) { + if (not success) { + std::cout << "Pht::insert: failed." << std::endl; + return; + } + std::cout << "Pht::insert: done." << std::endl; + } + ); + } } std::cout << std::endl << "Stopping node..." << std::endl; @@ -257,7 +309,9 @@ main(int argc, char **argv) if (int rc = gnutls_global_init()) // TODO: remove with GnuTLS >= 3.3 throw std::runtime_error(std::string("Error initializing GnuTLS: ")+gnutls_strerror(rc)); - DhtRunner dht; + auto dht = std::make_shared<DhtRunner>(); + std::map<std::string, dht::indexation::Pht> indexes; + try { auto params = parseArgs(argc, argv); if (params.help) { @@ -285,14 +339,14 @@ main(int argc, char **argv) if (not params.bootstrap.first.empty()) { //std::cout << "Bootstrap: " << params.bootstrap.first << ":" << params.bootstrap.second << std::endl; - dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); + dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); } if (params.daemonize) { while (true) std::this_thread::sleep_for(std::chrono::seconds(30)); } else { - cmd_loop(dht, params); + cmd_loop(dht, indexes, params); } } catch(const std::exception&e) { @@ -303,7 +357,7 @@ main(int argc, char **argv) std::mutex m; std::atomic_bool done {false}; - dht.shutdown([&]() + dht->shutdown([&]() { std::lock_guard<std::mutex> lk(m); done = true; @@ -314,7 +368,7 @@ main(int argc, char **argv) std::unique_lock<std::mutex> lk(m); cv.wait(lk, [&](){ return done.load(); }); - dht.join(); + dht->join(); gnutls_global_deinit(); return 0; diff --git a/tools/tools_common.h b/tools/tools_common.h index 45f6b8273e0fbc71973894f3d6b58dcf1e0f2ff7..bd0d19741d21338e9921dfb616af5b9dc5f3e699 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -135,6 +135,47 @@ splitPort(const std::string& s) { return {s.substr(0,found), s.substr(found+1)}; } +/* + * The mapString shall have the following format: + * + * k1:v1[,k2:v2[,...]] + */ +std::map<std::string, std::string> parseStringMap(std::string mapString) { + std::istringstream keySs(mapString); + std::string mapStr; + std::map<std::string, std::string> map; + + while (std::getline(keySs, mapStr, ',')) { + std::istringstream mapSs(mapStr); + std::string key, value; + + while (std::getline(mapSs, key, ':')) { + std::getline(mapSs, value, ':'); + map[key] = { value }; + } + } + return map; +} + +dht::indexation::Pht::Key createPhtKey(std::map<std::string, std::string> pht_key_str_map) { + dht::indexation::Pht::Key pht_key; + for (auto f : pht_key_str_map) { + dht::Blob prefix {f.second.begin(), f.second.end()}; + pht_key.emplace(f.first, prefix); + } + return pht_key; +} + +bool isInfoHash(dht::InfoHash& h) { + static constexpr dht::InfoHash INVALID_ID {}; + + if (h == INVALID_ID) { + std::cout << "Syntax error: invalid InfoHash." << std::endl; + return false; + } + return true; +} + static const constexpr in_port_t DHT_DEFAULT_PORT = 4222; struct dht_params {