Skip to content
Snippets Groups Projects
Commit d45f7770 authored by Simon Désaulniers's avatar Simon Désaulniers Committed by Adrien Béraud
Browse files

pht tools

parent 9c4035dd
Branches
Tags
No related merge requests found
......@@ -105,9 +105,7 @@ struct Prefix {
using Value = std::pair<InfoHash, dht::Value::Id>;
struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
const ValueType& getType() const {
return ValueType::USER_DATA;
}
static const ValueType TYPE;
virtual void unpackValue(const dht::Value& v) {
Serializable<IndexEntry>::unpackValue(v);
......
......@@ -41,6 +41,17 @@ cimport opendht_cpp as cpp
import threading
cdef inline void lookup_callback(cpp.vector[cpp.shared_ptr[cpp.IndexValue]]* values, cpp.Prefix* p, void *user_data) with gil:
cbs = <object>user_data
if 'lookup' in cbs and cbs['lookup']:
vals = []
for val in deref(values):
v = IndexValue()
v._value = val
vals.append(v)
cbs['lookup'](vals, p.toString())
ref.Py_DECREF(cbs)
cdef inline void shutdown_callback(void* user_data) with gil:
cbs = <object>user_data
if 'shutdown' in cbs and cbs['shutdown']:
......@@ -65,6 +76,12 @@ cdef inline void done_callback(bool done, cpp.vector[shared_ptr[cpp.Node]]* node
cbs['done'](done, node_ids)
ref.Py_DECREF(cbs)
cdef inline void done_callback_simple(bool done, void *user_data) with gil:
cbs = <object>user_data
if 'done' in cbs and cbs['done']:
cbs['done'](done)
ref.Py_DECREF(cbs)
cdef class _WithID(object):
def __repr__(self):
return "<%s '%s'>" % (self.__class__.__name__, str(self))
......@@ -73,7 +90,7 @@ cdef class _WithID(object):
cdef class InfoHash(_WithID):
cdef cpp.InfoHash _infohash
def __init__(self, bytes str=b''):
def __cinit__(self, bytes str=b''):
self._infohash = cpp.InfoHash(str)
def __bool__(InfoHash self):
return not (self._infohash == cpp.InfoHash())
......@@ -255,28 +272,28 @@ cdef class DhtConfig(object):
self._config.dht_config.node_config.node_id = id._infohash
cdef class DhtRunner(_WithID):
cdef cpp.DhtRunner* thisptr
cdef cpp.shared_ptr[cpp.DhtRunner] thisptr
def __cinit__(self):
self.thisptr = new cpp.DhtRunner()
self.thisptr.reset(new cpp.DhtRunner())
def getId(self):
h = InfoHash()
h._infohash = self.thisptr.getId()
h._infohash = self.thisptr.get().getId()
return h
def getNodeId(self):
return self.thisptr.getNodeId().toString()
return self.thisptr.get().getNodeId().toString()
def bootstrap(self, str host, str port):
self.thisptr.bootstrap(host.encode(), port.encode())
self.thisptr.get().bootstrap(host.encode(), port.encode())
def run(self, Identity id=None, is_bootstrap=False, cpp.in_port_t port=0, str ipv4="", str ipv6="", DhtConfig config=DhtConfig()):
if id:
config.setIdentity(id)
if ipv4 or ipv6:
bind4 = ipv4.encode() if ipv4 else b''
bind6 = ipv6.encode() if ipv6 else b''
self.thisptr.run(bind4, bind6, str(port).encode(), config._config)
self.thisptr.get().run(bind4, bind6, str(port).encode(), config._config)
else:
self.thisptr.run(port, config._config)
self.thisptr.get().run(port, config._config)
def join(self):
self.thisptr.join()
self.thisptr.get().join()
def shutdown(self, shutdown_cb=None):
cb_obj = {'shutdown':shutdown_cb}
ref.Py_INCREF(cb_obj)
......@@ -288,16 +305,16 @@ cdef class DhtRunner(_WithID):
def enableFileLogging(self, str path):
cpp.enableFileLogging(self.thisptr[0], path)
def isRunning(self):
return self.thisptr.isRunning()
return self.thisptr.get().isRunning()
def getStorageLog(self):
return self.thisptr.getStorageLog().decode()
return self.thisptr.get().getStorageLog().decode()
def getRoutingTablesLog(self, cpp.sa_family_t af):
return self.thisptr.getRoutingTablesLog(af).decode()
return self.thisptr.get().getRoutingTablesLog(af).decode()
def getSearchesLog(self, cpp.sa_family_t af):
return self.thisptr.getSearchesLog(af).decode()
return self.thisptr.get().getSearchesLog(af).decode()
def getNodeMessageStats(self):
stats = []
cdef cpp.vector[unsigned] res = self.thisptr.getNodeMessageStats(False)
cdef cpp.vector[unsigned] res = self.thisptr.get().getNodeMessageStats(False)
for n in res:
stats.append(n)
return stats
......@@ -305,9 +322,11 @@ cdef class DhtRunner(_WithID):
def get(self, InfoHash key, get_cb=None, done_cb=None):
"""Retreive values associated with a key on the DHT.
key -- the key for which to search
get_cb -- is set, makes the operation non-blocking. Called when a value is found on the DHT.
done_cb -- optional callback used when get_cb is set. Called when the operation is completed.
key -- the key for which to search
get_cb -- is set, makes the operation non-blocking. Called when a value
is found on the DHT.
done_cb -- optional callback used when get_cb is set. Called when the
operation is completed.
"""
if get_cb:
cb_obj = {'get':get_cb, 'done':done_cb}
......@@ -335,8 +354,8 @@ cdef class DhtRunner(_WithID):
def put(self, InfoHash key, Value val, done_cb=None):
"""Publish a new value on the DHT at key.
key -- the DHT key where to put the value
val -- the value to put on the DHT
key -- the DHT key where to put the value
val -- the value to put on the DHT
done_cb -- optional callback called when the operation is completed.
"""
if done_cb:
......@@ -369,6 +388,62 @@ cdef class DhtRunner(_WithID):
t._t = self.thisptr.listen(t._h, cpp.bindGetCb(get_callback, <void*>cb_obj)).share()
return t
def cancelListen(self, ListenToken token):
self.thisptr.cancelListen(token._h, token._t)
self.thisptr.get().cancelListen(token._h, token._t)
ref.Py_DECREF(<object>token._cb['cb'])
# fixme: not thread safe
cdef class IndexValue(object):
cdef cpp.shared_ptr[cpp.IndexValue] _value
def __init__(self, InfoHash h, cpp.uint64_t vid=0):
cdef cpp.InfoHash hh = h._infohash
self._value.reset(new cpp.IndexValue(hh, vid))
def getKey(self):
h = InfoHash()
h._infohash = self._value.get().first
return h
def getValueId(self):
return self._value.get().second
cdef class Pht(object):
cdef cpp.Pht* thisptr
def __cinit__(self, bytes name, DhtRunner dht):
self.thisptr = new cpp.Pht(name, dht.thisptr)
def lookup(self, key, lookup_cb=None, done_cb=None):
"""Query the Index with a specified key.
key -- the key for to the entry in the index.
lookup_cb -- function called when the operation is completed. This
function takes a list of IndexValue objects and a string
representation of the prefix where the value was indexed in
the PHT.
"""
cb_obj = {'lookup':lookup_cb, 'done':done_cb} # TODO: donecallback is to be removed
ref.Py_INCREF(cb_obj)
cdef cpp.IndexKey cppk
for kk, v in key.items():
cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v))
self.thisptr.lookup(
cppk,
cpp.Pht.bindLookupCb(lookup_callback, <void*>cb_obj),
cpp.Dht.bindDoneCbSimple(done_callback_simple, <void*>cb_obj)
)
def insert(self, key, IndexValue value, done_cb=None):
"""Add an index entry to the Index.
key -- the key for to the entry in the index.
value -- an IndexValue object describing the indexed value.
done_cb -- Called when the operation is completed.
"""
cb_obj = {'done':done_cb}
cdef cpp.IndexKey cppk
for kk, v in key.items():
cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v))
cdef cpp.IndexValue val
val.first = (<InfoHash>value.getKey())._infohash
val.second = value.getValueId()
self.thisptr.insert(
cppk,
val,
cpp.Dht.bindDoneCbSimple(done_callback_simple, <void*>cb_obj)
)
......@@ -20,11 +20,19 @@ from libcpp cimport bool
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.utility cimport pair
from libcpp.memory cimport shared_ptr
from libcpp.map cimport map
ctypedef uint16_t in_port_t
ctypedef unsigned short int sa_family_t;
cdef extern from "<memory>" namespace "std" nogil:
cdef cppclass shared_ptr[T]:
shared_ptr() except +
shared_ptr(T*) except +
T* get()
T operator*()
void reset(T*)
cdef extern from "<functional>" namespace "std" nogil:
cdef cppclass hash[T]:
hash() except +
......@@ -150,3 +158,20 @@ cdef extern from "opendht/log.h" namespace "dht::log":
void disableLogging(DhtRunner& dht)
void enableFileLogging(DhtRunner& dht, const string& path)
cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation":
cdef cppclass Prefix:
Prefix() except +
Prefix(vector[uint8_t]) except +
string toString() const
ctypedef pair[InfoHash, uint64_t] IndexValue "dht::indexation::Value"
ctypedef map[string, Prefix] IndexKey "dht::indexation::Pht::Key"
ctypedef void (*LookupCallbackRaw)(vector[shared_ptr[IndexValue]]* values, Prefix* p, void* user_data);
cdef cppclass Pht:
cppclass LookupCallback:
LookupCallback() except +
Pht(string, shared_ptr[DhtRunner]) except +
void lookup(IndexKey k, LookupCallback cb, Dht.DoneCallbackSimple doneCb);
void insert(IndexKey k, IndexValue v, Dht.DoneCallbackSimple cb)
@staticmethod
LookupCallback bindLookupCb(LookupCallbackRaw cb, void *user_data)
......@@ -38,10 +38,12 @@ void print_id_req() {
std::cout << "An identity is required to perform this operation (run with -i)" << std::endl;
}
void print_node_info(const DhtRunner& dht, const dht_params& params) {
std::cout << "OpenDht node " << dht.getNodeId() << " running on port " << dht.getBoundPort() << std::endl;
void print_node_info(const std::shared_ptr<DhtRunner>& dht, const dht_params& params) {
std::cout << "OpenDht node " << dht->getNodeId() << " running on port " << dht->getBoundPort() << std::endl;
if (params.is_bootstrap_node)
std::cout << "Running in bootstrap mode (discouraged)." << std::endl;
if (params.generate_identity)
std::cout << "Public key ID " << dht.getId() << std::endl;
std::cout << "Public key ID " << dht->getId() << std::endl;
}
void print_help() {
......@@ -64,11 +66,14 @@ void print_help() {
<< " l [key] [where] Listen for value changes at [key]. [where] is the 'where' part of an SQL-ish string." << std::endl
<< " p [key] [str] Put string value at [key]." << std::endl
<< " s [key] [str] Put string value at [key], signed with our generated private key." << std::endl
<< " e [key] [dest] [str] Put string value at [key], encrypted for [dest] with its public key (if found)." << std::endl
<< " e [key] [dest] [str] Put string value at [key], encrypted for [dest] with its public key (if found)." << std::endl;
std::cout << std::endl << "Indexation operations on the DHT:" << std::endl
<< " il [name] [key] Lookup the index named [name] with the key [key]." << std::endl
<< " ii [name] [key] [value] Inserts the value [value] under the key [key] in the index named [name]." << std::endl
<< std::endl;
}
void cmd_loop(DhtRunner& dht, dht_params& params)
void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexation::Pht> indexes, dht_params& params)
{
print_node_info(dht, params);
std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl << std::endl;
......@@ -84,8 +89,8 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
break;
std::istringstream iss(line);
std::string op, idstr, value;
iss >> op >> idstr;
std::string op, idstr, value, index, keystr;
iss >> op;
if (op == "x" || op == "exit" || op == "quit") {
break;
......@@ -96,31 +101,32 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
print_node_info(dht, params);
unsigned good4, dubious4, cached4, incoming4;
unsigned good6, dubious6, cached6, incoming6;
dht.getNodesStats(AF_INET, &good4, &dubious4, &cached4, &incoming4);
dht.getNodesStats(AF_INET6, &good6, &dubious6, &cached6, &incoming6);
dht->getNodesStats(AF_INET, &good4, &dubious4, &cached4, &incoming4);
dht->getNodesStats(AF_INET6, &good6, &dubious6, &cached6, &incoming6);
std::cout << "IPv4 nodes : " << good4 << " good, " << dubious4 << " dubious, " << incoming4 << " incoming." << std::endl;
std::cout << "IPv6 nodes : " << good6 << " good, " << dubious6 << " dubious, " << incoming6 << " incoming." << std::endl;
continue;
} else if (op == "lr") {
std::cout << "IPv4 routing table:" << std::endl;
std::cout << dht.getRoutingTablesLog(AF_INET) << std::endl;
std::cout << dht->getRoutingTablesLog(AF_INET) << std::endl;
std::cout << "IPv6 routing table:" << std::endl;
std::cout << dht.getRoutingTablesLog(AF_INET6) << std::endl;
std::cout << dht->getRoutingTablesLog(AF_INET6) << std::endl;
continue;
} else if (op == "ld") {
std::cout << dht.getStorageLog() << std::endl;
std::cout << dht->getStorageLog() << std::endl;
continue;
} else if (op == "ls") {
std::cout << "Searches:" << std::endl;
std::cout << dht.getSearchesLog() << std::endl;
std::cout << dht->getSearchesLog() << std::endl;
continue;
} else if (op == "la") {
std::cout << "Reported public addresses:" << std::endl;
auto addrs = dht.getPublicAddressStr();
auto addrs = dht->getPublicAddressStr();
for (const auto& addr : addrs)
std::cout << addr << std::endl;
continue;
} else if (op == "b") {
iss >> idstr;
try {
auto addr = splitPort(idstr);
if (not addr.first.empty() and addr.second.empty()){
......@@ -128,7 +134,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
ss << DHT_DEFAULT_PORT;
addr.second = ss.str();
}
dht.bootstrap(addr.first.c_str(), addr.second.c_str());
dht->bootstrap(addr.first.c_str(), addr.second.c_str());
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
......@@ -145,24 +151,36 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
if (op.empty())
continue;
dht::InfoHash id {idstr};
static const std::set<std::string> VALID_OPS {"g", "q", "l", "p", "s", "e", "a"};
static const std::set<std::string> VALID_OPS {"g", "l", "il", "ii", "p", "s", "e", "a"};
if (VALID_OPS.find(op) == VALID_OPS.cend()) {
std::cout << "Unknown command: " << op << std::endl;
std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl;
continue;
}
static constexpr dht::InfoHash INVALID_ID {};
if (id == INVALID_ID) {
std::cout << "Syntax error: invalid InfoHash." << std::endl;
continue;
if (op == "il" or op == "ii") {
// Pht syntax
iss >> index >> keystr;
if (not index.size()) {
std::cout << "You must enter the index name." << std::endl;
continue;
} else {
indexes.emplace(index, dht::indexation::Pht {index, dht});
}
}
else {
// Dht syntax
iss >> idstr;
InfoHash h {idstr};
if (not isInfoHash(h))
continue;
}
dht::InfoHash id {idstr};
// Dht
auto start = std::chrono::high_resolution_clock::now();
if (op == "g") {
std::string rem;
std::getline(iss, rem);
dht.get(id, [start](std::shared_ptr<Value> value) {
dht->get(id, [start](std::shared_ptr<Value> value) {
auto now = std::chrono::high_resolution_clock::now();
std::cout << "Get: found value (after " << print_dt(now-start) << "s)" << std::endl;
std::cout << "\t" << *value << std::endl;
......@@ -199,7 +217,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
else if (op == "p") {
std::string v;
iss >> v;
dht.put(id, dht::Value {
dht->put(id, dht::Value {
dht::ValueType::USER_DATA.id,
std::vector<uint8_t> {v.begin(), v.end()}
}, [start](bool ok) {
......@@ -214,7 +232,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
}
std::string v;
iss >> v;
dht.putSigned(id, dht::Value {
dht->putSigned(id, dht::Value {
dht::ValueType::USER_DATA.id,
std::vector<uint8_t> {v.begin(), v.end()}
}, [start](bool ok) {
......@@ -230,7 +248,7 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
std::string tostr;
std::string v;
iss >> tostr >> v;
dht.putEncrypted(id, InfoHash(tostr), dht::Value {
dht->putEncrypted(id, InfoHash(tostr), dht::Value {
dht::ValueType::USER_DATA.id,
std::vector<uint8_t> {v.begin(), v.end()}
}, [start](bool ok) {
......@@ -241,11 +259,45 @@ void cmd_loop(DhtRunner& dht, dht_params& params)
else if (op == "a") {
in_port_t port;
iss >> port;
dht.put(id, dht::Value {dht::IpServiceAnnouncement::TYPE.id, dht::IpServiceAnnouncement(port)}, [start](bool ok) {
dht->put(id, dht::Value {dht::IpServiceAnnouncement::TYPE.id, dht::IpServiceAnnouncement(port)}, [start](bool ok) {
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Announce: " << (ok ? "success" : "failure") << " (took " << print_dt(end-start) << "s)" << std::endl;
});
}
else if (op == "il") {
indexes.at(index).lookup(createPhtKey(parseStringMap(keystr)),
[=](std::vector<std::shared_ptr<indexation::Value>>& vals, indexation::Prefix p) {
std::cout << "Pht::lookup: at prefix \"" << p.toString() << "\"" << ", hash is " << p.hash() << std::endl;
for (auto v : vals) {
std::cout << "Pht::lookup: found hash" << std::endl
<< "\t" << v->first << "[vid: " << v->second << "]" << std::endl;
}
std::cout << "Pht::lookup: done." << std::endl;
},
[](bool ok) {
if (not ok) {
std::cout << "Pht::lookup: dht Get failed." << std::endl;
}
}
);
}
else if (op == "ii") {
iss >> idstr;
InfoHash h {idstr};
if (not isInfoHash(h))
continue;
indexation::Value v {h, 0};
indexes.at(index).insert(createPhtKey(parseStringMap(keystr)), v,
[=](bool success) {
if (not success) {
std::cout << "Pht::insert: failed." << std::endl;
return;
}
std::cout << "Pht::insert: done." << std::endl;
}
);
}
}
std::cout << std::endl << "Stopping node..." << std::endl;
......@@ -257,7 +309,9 @@ main(int argc, char **argv)
if (int rc = gnutls_global_init()) // TODO: remove with GnuTLS >= 3.3
throw std::runtime_error(std::string("Error initializing GnuTLS: ")+gnutls_strerror(rc));
DhtRunner dht;
auto dht = std::make_shared<DhtRunner>();
std::map<std::string, dht::indexation::Pht> indexes;
try {
auto params = parseArgs(argc, argv);
if (params.help) {
......@@ -285,14 +339,14 @@ main(int argc, char **argv)
if (not params.bootstrap.first.empty()) {
//std::cout << "Bootstrap: " << params.bootstrap.first << ":" << params.bootstrap.second << std::endl;
dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str());
dht->bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str());
}
if (params.daemonize) {
while (true)
std::this_thread::sleep_for(std::chrono::seconds(30));
} else {
cmd_loop(dht, params);
cmd_loop(dht, indexes, params);
}
} catch(const std::exception&e) {
......@@ -303,7 +357,7 @@ main(int argc, char **argv)
std::mutex m;
std::atomic_bool done {false};
dht.shutdown([&]()
dht->shutdown([&]()
{
std::lock_guard<std::mutex> lk(m);
done = true;
......@@ -314,7 +368,7 @@ main(int argc, char **argv)
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&](){ return done.load(); });
dht.join();
dht->join();
gnutls_global_deinit();
return 0;
......
......@@ -135,6 +135,47 @@ splitPort(const std::string& s) {
return {s.substr(0,found), s.substr(found+1)};
}
/*
* The mapString shall have the following format:
*
* k1:v1[,k2:v2[,...]]
*/
std::map<std::string, std::string> parseStringMap(std::string mapString) {
std::istringstream keySs(mapString);
std::string mapStr;
std::map<std::string, std::string> map;
while (std::getline(keySs, mapStr, ',')) {
std::istringstream mapSs(mapStr);
std::string key, value;
while (std::getline(mapSs, key, ':')) {
std::getline(mapSs, value, ':');
map[key] = { value };
}
}
return map;
}
dht::indexation::Pht::Key createPhtKey(std::map<std::string, std::string> pht_key_str_map) {
dht::indexation::Pht::Key pht_key;
for (auto f : pht_key_str_map) {
dht::Blob prefix {f.second.begin(), f.second.end()};
pht_key.emplace(f.first, prefix);
}
return pht_key;
}
bool isInfoHash(dht::InfoHash& h) {
static constexpr dht::InfoHash INVALID_ID {};
if (h == INVALID_ID) {
std::cout << "Syntax error: invalid InfoHash." << std::endl;
return false;
}
return true;
}
static const constexpr in_port_t DHT_DEFAULT_PORT = 4222;
struct dht_params {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment