diff --git a/include/opendht/dht.h b/include/opendht/dht.h index e2ef548b13756e13d7dae379b1350dacc654f75f..3c0d139d262016c0b856aa074d725569b3fa1caf 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -133,6 +133,7 @@ public: typedef std::function<bool(const std::vector<std::shared_ptr<Value>>& values)> GetCallback; typedef std::function<bool(std::shared_ptr<Value> value)> GetCallbackSimple; + typedef std::function<void()> ShutdownCallback; typedef bool (*GetCallbackRaw)(std::shared_ptr<Value>, void *user_data); @@ -156,9 +157,14 @@ public: typedef std::function<void(bool success, const std::vector<std::shared_ptr<Node>>& nodes)> DoneCallback; typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data); + typedef void (*ShutdownCallbackRaw)(void *user_data); typedef std::function<void(bool success)> DoneCallbackSimple; + static ShutdownCallback + bindShutdownCb(ShutdownCallbackRaw shutdown_cb_raw, void* user_data) { + return [=]() { shutdown_cb_raw(user_data); }; + } static DoneCallback bindDoneCb(DoneCallbackSimple donecb) { if (not donecb) return {}; @@ -198,6 +204,11 @@ public: return std::max(getStatus(AF_INET), getStatus(AF_INET6)); } + /** + * Performs final operations before quitting. + */ + void shutdown(ShutdownCallback cb); + /** * Returns true if the node is running (have access to an open socket). * diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index f37105eb114785c39dcf5aa99334b354f2040d83..f24625cdbb14f7808d0fc1e3f3269f58c2c5f0a0 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -373,6 +373,11 @@ public: return loop_(); } + /** + * Gracefuly disconnect from network. + */ + void shutdown(Dht::ShutdownCallback cb); + /** * Quit and wait for all threads to terminate. * No callbacks will be called after this method returns. diff --git a/python/opendht.pyx b/python/opendht.pyx index 6824a2e2ce957e7bfbb11bb6d0e12c16b5928b5b..7789bfd1894afaf9493a11336c25f01992bfa85a 100644 --- a/python/opendht.pyx +++ b/python/opendht.pyx @@ -5,12 +5,12 @@ # distutils: libraries = opendht gnutls # cython: language_level=3 # -# Copyright (c) 2015 Savoir-Faire Linux Inc. +# Copyright (c) 2015 Savoir-Faire Linux Inc. # Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> # Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> # # This wrapper is written for Cython 0.22 -# +# # This file is part of OpenDHT Python Wrapper. # # OpenDHT Python Wrapper is free software: you can redistribute it and/or modify @@ -40,6 +40,12 @@ cimport opendht_cpp as cpp import threading +cdef inline void shutdown_callback(void* user_data) with gil: + cbs = <object>user_data + if 'shutdown' in cbs and cbs['shutdown']: + cbs['shutdown']() + ref.Py_DECREF(cbs) + cdef inline bool get_callback(cpp.shared_ptr[cpp.Value] value, void *user_data) with gil: cb = (<object>user_data)['get'] pv = Value() @@ -239,6 +245,8 @@ cdef class DhtConfig(object): self._config.dht_config.id = id._id def setBootstrapMode(self, bool bootstrap): self._config.dht_config.node_config.is_bootstrap = bootstrap + def setNodeId(self, InfoHash id): + self._config.dht_config.node_config.node_id = id._infohash cdef class DhtRunner(_WithID): cdef cpp.DhtRunner* thisptr @@ -252,15 +260,21 @@ cdef class DhtRunner(_WithID): return self.thisptr.getNodeId().toString() def bootstrap(self, str host, str port): self.thisptr.bootstrap(host.encode(), port.encode()) - def run(self, Identity id = Identity(), is_bootstrap=False, cpp.in_port_t port=0, str ipv4="", str ipv6=""): - config = DhtConfig() - config.setIdentity(id) + def run(self, Identity id=None, is_bootstrap=False, cpp.in_port_t port=0, str ipv4="", str ipv6="", DhtConfig config=DhtConfig()): + if id: + config.setIdentity(id) if ipv4 or ipv6: - self.thisptr.run(ipv4.encode(), ipv6.encode(), str(port).encode(), config._config) + bind4 = ipv4.encode() if ipv4 else b'' + bind6 = ipv6.encode() if ipv6 else b'' + self.thisptr.run(bind4, bind6, str(port).encode(), config._config) else: self.thisptr.run(port, config._config) def join(self): self.thisptr.join() + def shutdown(self, shutdown_cb=None): + cb_obj = {'shutdown':shutdown_cb} + ref.Py_INCREF(cb_obj) + self.thisptr.shutdown(cpp.Dht.bindShutdownCb(shutdown_callback, <void*>cb_obj)) def isRunning(self): return self.thisptr.isRunning() def getStorageLog(self): @@ -327,5 +341,5 @@ cdef class DhtRunner(_WithID): return t def cancelListen(self, ListenToken token): self.thisptr.cancelListen(token._h, token._t) - # fixme: not thread safe ref.Py_DECREF(<object>token._cb['cb']) + # fixme: not thread safe diff --git a/python/opendht_cpp.pxd b/python/opendht_cpp.pxd index 36053cd41de18a64202f739650c5cab1c572958c..2c2bd7a405a5f18fd1524240752884e8e5dec9cc 100644 --- a/python/opendht_cpp.pxd +++ b/python/opendht_cpp.pxd @@ -80,12 +80,15 @@ cdef extern from "opendht/dht.h" namespace "dht": InfoHash getId() const string getAddrStr() const bool isExpired() const + ctypedef void (*ShutdownCallbackRaw)(void *user_data) ctypedef bool (*GetCallbackRaw)(shared_ptr[Value] values, void *user_data) ctypedef void (*DoneCallbackRaw)(bool done, vector[shared_ptr[Node]]* nodes, void *user_data) cdef cppclass Dht: cppclass Config: InfoHash node_id bool is_bootstrap + cppclass ShutdownCallback: + ShutdownCallback() except + cppclass GetCallback: GetCallback() except + #GetCallback(GetCallbackRaw cb, void *user_data) except + @@ -95,6 +98,8 @@ cdef extern from "opendht/dht.h" namespace "dht": Dht() except + InfoHash getNodeId() const @staticmethod + ShutdownCallback bindShutdownCb(ShutdownCallbackRaw cb, void *user_data) + @staticmethod GetCallback bindGetCb(GetCallbackRaw cb, void *user_data) @staticmethod DoneCallback bindDoneCb(DoneCallbackRaw cb, void *user_data) @@ -119,6 +124,7 @@ cdef extern from "opendht/dhtrunner.h" namespace "dht": void run(in_port_t, Config config) void run(const char*, const char*, const char*, Config config) void join() + void shutdown(Dht.ShutdownCallback) bool isRunning() string getStorageLog() const string getRoutingTablesLog(sa_family_t af) const diff --git a/src/dht.cpp b/src/dht.cpp index 9a1f12856569611f130f132236aeaa9a1f8e918a..23dddde14e0ebb3f5aff4e97811dfd6df1e09cd5 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -180,6 +180,11 @@ Dht::getStatus(sa_family_t af) const return Status::Connected; } +void +Dht::shutdown(ShutdownCallback cb) { + if (cb) { cb(); } +} + bool Dht::isRunning(sa_family_t af) const { diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 63638c931f8b92f841483111b234b99fb6f349fb..0822d241fe276d9380f46530a8a6ae1c6764f36e 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -118,6 +118,15 @@ DhtRunner::run(const sockaddr_in* local4, const sockaddr_in6* local6, DhtRunner: }); } +void +DhtRunner::shutdown(Dht::ShutdownCallback cb) { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) mutable { + dht.shutdown(cb); + }); + cv.notify_all(); +} + void DhtRunner::join() {