Skip to content
Snippets Groups Projects
Commit b96d0de8 authored by Simon Désaulniers's avatar Simon Désaulniers Committed by Adrien Béraud
Browse files

python: make use of Filter, Where in DhtRunner.get

parent 98c4e39e
Branches
Tags
No related merge requests found
...@@ -99,6 +99,7 @@ using DoneCallback = std::function<void(bool success, const std::vector<std::sha ...@@ -99,6 +99,7 @@ using DoneCallback = std::function<void(bool success, const std::vector<std::sha
typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data); typedef void (*DoneCallbackRaw)(bool, std::vector<std::shared_ptr<Node>>*, void *user_data);
typedef void (*ShutdownCallbackRaw)(void *user_data); typedef void (*ShutdownCallbackRaw)(void *user_data);
typedef void (*DoneCallbackSimpleRaw)(bool, void *user_data); typedef void (*DoneCallbackSimpleRaw)(bool, void *user_data);
typedef bool (*FilterRaw)(const Value&, void *user_data);
using DoneCallbackSimple = std::function<void(bool success)>; using DoneCallbackSimple = std::function<void(bool success)>;
...@@ -106,5 +107,6 @@ OPENDHT_PUBLIC ShutdownCallback bindShutdownCb(ShutdownCallbackRaw shutdown_cb_r ...@@ -106,5 +107,6 @@ OPENDHT_PUBLIC ShutdownCallback bindShutdownCb(ShutdownCallbackRaw shutdown_cb_r
OPENDHT_PUBLIC DoneCallback bindDoneCb(DoneCallbackSimple donecb); OPENDHT_PUBLIC DoneCallback bindDoneCb(DoneCallbackSimple donecb);
OPENDHT_PUBLIC DoneCallback bindDoneCb(DoneCallbackRaw raw_cb, void* user_data); OPENDHT_PUBLIC DoneCallback bindDoneCb(DoneCallbackRaw raw_cb, void* user_data);
OPENDHT_PUBLIC DoneCallbackSimple bindDoneCbSimple(DoneCallbackSimpleRaw raw_cb, void* user_data); OPENDHT_PUBLIC DoneCallbackSimple bindDoneCbSimple(DoneCallbackSimpleRaw raw_cb, void* user_data);
OPENDHT_PUBLIC Value::Filter bindFilterRaw(FilterRaw raw_filter, void* user_data);
} }
...@@ -58,10 +58,12 @@ cdef inline void shutdown_callback(void* user_data) with gil: ...@@ -58,10 +58,12 @@ cdef inline void shutdown_callback(void* user_data) with gil:
ref.Py_DECREF(cbs) ref.Py_DECREF(cbs)
cdef inline bool get_callback(shared_ptr[cpp.Value] value, void *user_data) with gil: cdef inline bool get_callback(shared_ptr[cpp.Value] value, void *user_data) with gil:
cb = (<object>user_data)['get'] cbs = <object>user_data
cb = cbs['get']
f = cbs['filter'] if 'filter' in cbs else None
pv = Value() pv = Value()
pv._value = value pv._value = value
return cb(pv) return cb(pv) if not f or f(pv) else True
cdef inline void done_callback(bool done, cpp.vector[shared_ptr[cpp.Node]]* nodes, void *user_data) with gil: cdef inline void done_callback(bool done, cpp.vector[shared_ptr[cpp.Node]]* nodes, void *user_data) with gil:
node_ids = [] node_ids = []
...@@ -513,7 +515,7 @@ cdef class DhtRunner(_WithID): ...@@ -513,7 +515,7 @@ cdef class DhtRunner(_WithID):
stats.append(n) stats.append(n)
return stats return stats
def get(self, InfoHash key, get_cb=None, done_cb=None): def get(self, InfoHash key, get_cb=None, done_cb=None, filter=None, Where where=None):
"""Retreive values associated with a key on the DHT. """Retreive values associated with a key on the DHT.
key -- the key for which to search key -- the key for which to search
...@@ -523,9 +525,12 @@ cdef class DhtRunner(_WithID): ...@@ -523,9 +525,12 @@ cdef class DhtRunner(_WithID):
operation is completed. operation is completed.
""" """
if get_cb: if get_cb:
cb_obj = {'get':get_cb, 'done':done_cb} cb_obj = {'get':get_cb, 'done':done_cb, 'filter':filter}
ref.Py_INCREF(cb_obj) ref.Py_INCREF(cb_obj)
self.thisptr.get().get(key._infohash, cpp.bindGetCb(get_callback, <void*>cb_obj), cpp.bindDoneCb(done_callback, <void*>cb_obj)) self.thisptr.get().get(key._infohash, cpp.bindGetCb(get_callback, <void*>cb_obj),
cpp.bindDoneCb(done_callback, <void*>cb_obj),
cpp.nullptr, #filter implemented in the get_callback
where._where)
else: else:
lock = threading.Condition() lock = threading.Condition()
pending = 0 pending = 0
...@@ -541,7 +546,7 @@ cdef class DhtRunner(_WithID): ...@@ -541,7 +546,7 @@ cdef class DhtRunner(_WithID):
lock.notify() lock.notify()
with lock: with lock:
pending += 1 pending += 1
self.get(key, get_cb=tmp_get, done_cb=tmp_done) self.get(key, get_cb=tmp_get, done_cb=tmp_done, filter=filter, where=where)
while pending > 0: while pending > 0:
lock.wait() lock.wait()
return res return res
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# along with this program; If not, see <https://www.gnu.org/licenses/>. # along with this program; If not, see <https://www.gnu.org/licenses/>.
from libc.stdint cimport * from libc.stdint cimport *
from libcpp cimport bool from libcpp cimport bool, nullptr_t, nullptr
from libcpp.string cimport string from libcpp.string cimport string
from libcpp.vector cimport vector from libcpp.vector cimport vector
from libcpp.utility cimport pair from libcpp.utility cimport pair
...@@ -228,7 +228,7 @@ cdef extern from "opendht/dhtrunner.h" namespace "dht": ...@@ -228,7 +228,7 @@ cdef extern from "opendht/dhtrunner.h" namespace "dht":
string getStorageLog() const string getStorageLog() const
string getRoutingTablesLog(sa_family_t af) const string getRoutingTablesLog(sa_family_t af) const
string getSearchesLog(sa_family_t af) const string getSearchesLog(sa_family_t af) const
void get(InfoHash key, GetCallback get_cb, DoneCallback done_cb) void get(InfoHash key, GetCallback get_cb, DoneCallback done_cb, nullptr_t f, Where w)
void put(InfoHash key, shared_ptr[Value] val, DoneCallback done_cb) void put(InfoHash key, shared_ptr[Value] val, DoneCallback done_cb)
ListenToken listen(InfoHash key, GetCallback get_cb) ListenToken listen(InfoHash key, GetCallback get_cb)
void cancelListen(InfoHash key, SharedListenToken token) void cancelListen(InfoHash key, SharedListenToken token)
......
...@@ -26,6 +26,13 @@ namespace dht { ...@@ -26,6 +26,13 @@ namespace dht {
const std::string Query::QUERY_PARSE_ERROR {"Error parsing query."}; const std::string Query::QUERY_PARSE_ERROR {"Error parsing query."};
Value::Filter bindFilterRaw(FilterRaw raw_filter, void* user_data) {
if (not raw_filter) return {};
return [=](const Value& value) {
return raw_filter(value, user_data);
};
}
std::ostream& operator<< (std::ostream& s, const Value& v) std::ostream& operator<< (std::ostream& s, const Value& v)
{ {
s << "Value[id:" << std::hex << v.id << std::dec << " "; s << "Value[id:" << std::hex << v.id << std::dec << " ";
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment