-
Adrien Béraud authoredAdrien Béraud authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
opendht.pyx 12.16 KiB
# distutils: language = c++
# distutils: extra_compile_args = -std=c++11
# distutils: include_dirs = ../../include
# distutils: library_dirs = ../../src
# distutils: libraries = opendht gnutls
# cython: language_level=3
#
# Copyright (c) 2015-2016 Savoir-faire Linux Inc.
# Author(s): Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
# Adrien Béraud <adrien.beraud@savoirfairelinux.com>
# Simon Désaulniers <sim.desaulniers@gmail.com>
#
# This wrapper is written for Cython 0.22
#
# This file is part of OpenDHT Python Wrapper.
#
# OpenDHT Python Wrapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenDHT Python Wrapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenDHT Python Wrapper. If not, see <http://www.gnu.org/licenses/>.
from libcpp.map cimport map as map
from libcpp cimport bool
from libcpp.utility cimport pair
from libcpp.string cimport string
from cython.parallel import parallel, prange
from cython.operator cimport dereference as deref, preincrement as inc, predecrement as dec
from cpython cimport ref
cimport opendht_cpp as cpp
import threading
cdef inline void shutdown_callback(void* user_data) with gil:
cbs = <object>user_data
if 'shutdown' in cbs and cbs['shutdown']:
cbs['shutdown']()
ref.Py_DECREF(cbs)
cdef inline bool get_callback(cpp.shared_ptr[cpp.Value] value, void *user_data) with gil:
cb = (<object>user_data)['get']
pv = Value()
pv._value = value
return cb(pv)
cdef inline void done_callback(bool done, cpp.vector[cpp.shared_ptr[cpp.Node]]* nodes, void *user_data) with gil:
node_ids = []
for n in deref(nodes):
h = NodeEntry()
h._v.first = n.get().getId()
h._v.second = n
node_ids.append(h)
cbs = <object>user_data
if 'done' in cbs and cbs['done']:
cbs['done'](done, node_ids)
ref.Py_DECREF(cbs)
cdef class _WithID(object):
def __repr__(self):
return "<%s '%s'>" % (self.__class__.__name__, str(self))
def __str__(self):
return self.getId().toString().decode()
cdef class InfoHash(_WithID):
cdef cpp.InfoHash _infohash
def __init__(self, bytes str=b''):
self._infohash = cpp.InfoHash(str)
def __bool__(InfoHash self):
return not (self._infohash == cpp.InfoHash())
def __richcmp__(InfoHash self, InfoHash other, int op):
if op == 0:
return self._infohash < other._infohash
if op == 1:
return self._infohash < other._infohash or self._infohash == other._infohash
if op == 2:
return self._infohash == other._infohash
return NotImplemented
def __hash__(self):
cdef cpp.hash[cpp.InfoHash] hash_fn
return hash_fn.get(self._infohash)
def getBit(InfoHash self, bit):
return self._infohash.getBit(bit)
def setBit(InfoHash self, bit, b):
self._infohash.setBit(bit, b)
def getId(InfoHash self):
return self
def toString(InfoHash self):
return self._infohash.toString()
def toFloat(InfoHash self):
return self._infohash.toFloat()
@staticmethod
def commonBits(InfoHash a, InfoHash b):
return cpp.InfoHash.commonBits(a._infohash, b._infohash)
@staticmethod
def get(str key):
h = InfoHash()
h._infohash = cpp.InfoHash.get(key.encode())
return h
@staticmethod
def getRandom():
h = InfoHash()
h._infohash = cpp.InfoHash.getRandom()
return h
cdef class Node(_WithID):
cdef cpp.shared_ptr[cpp.Node] _node
def getId(self):
h = InfoHash()
h._infohash = self._node.get().getId()
return h
def getAddr(self):
return self._node.get().getAddrStr()
def isExpired(self):
return self._node.get().isExpired()
cdef class NodeEntry(_WithID):
cdef cpp.pair[cpp.InfoHash, cpp.shared_ptr[cpp.Node]] _v
def getId(self):
h = InfoHash()
h._infohash = self._v.first
return h
def getNode(self):
n = Node()
n._node = self._v.second
return n
cdef class Value(object):
cdef cpp.shared_ptr[cpp.Value] _value
def __init__(self, bytes val=b''):
self._value.reset(new cpp.Value(val, len(val)))
def __str__(self):
return self._value.get().toString().decode()
property owner:
def __get__(self):
h = InfoHash()
h._infohash = self._value.get().owner.getId()
return h
property recipient:
def __get__(self):
h = InfoHash()
h._infohash = self._value.get().recipient
return h
def __set__(self, InfoHash h):
self._value.get().recipient = h._infohash
property data:
def __get__(self):
return string(<char*>self._value.get().data.data(), self._value.get().data.size())
def __set__(self, bytes value):
self._value.get().data = value
property id:
def __get__(self):
return self._value.get().id
def __set__(self, cpp.uint64_t value):
self._value.get().id = value
cdef class NodeSetIter(object):
cdef map[cpp.InfoHash, cpp.shared_ptr[cpp.Node]]* _nodes
cdef map[cpp.InfoHash, cpp.shared_ptr[cpp.Node]].iterator _curIter
def __init__(self, NodeSet s):
self._nodes = &s._nodes
self._curIter = self._nodes.begin()
def __next__(self):
if self._curIter == self._nodes.end():
raise StopIteration
h = NodeEntry()
h._v = deref(self._curIter)
inc(self._curIter)
return h
cdef class NodeSet(object):
cdef map[cpp.InfoHash, cpp.shared_ptr[cpp.Node]] _nodes
def size(self):
return self._nodes.size()
def insert(self, NodeEntry l):
self._nodes.insert(l._v)
def extend(self, li):
for n in li:
self.insert(n)
def first(self):
if self._nodes.empty():
raise IndexError()
h = InfoHash()
h._infohash = deref(self._nodes.begin()).first
return h
def last(self):
if self._nodes.empty():
raise IndexError()
h = InfoHash()
h._infohash = deref(dec(self._nodes.end())).first
return h
def __str__(self):
s = ''
cdef map[cpp.InfoHash, cpp.shared_ptr[cpp.Node]].iterator it = self._nodes.begin()
while it != self._nodes.end():
s += deref(it).first.toString().decode() + ' ' + deref(it).second.get().getAddrStr().decode() + '\n'
inc(it)
return s
def __iter__(self):
return NodeSetIter(self)
cdef class PublicKey(_WithID):
cdef cpp.PublicKey _key
def getId(self):
h = InfoHash()
h._infohash = self._key.getId()
return h
cdef class Certificate(_WithID):
cdef cpp.shared_ptr[cpp.Certificate] _cert
def getId(self):
h = InfoHash()
h._infohash = self._cert.get().getId()
return h
cdef class ListenToken(object):
cdef cpp.InfoHash _h
cdef cpp.shared_future[size_t] _t
_cb = dict()
cdef class Identity(object):
cdef cpp.Identity _id
def generate(self, str name = "pydht", Identity ca = Identity(), unsigned bits = 4096):
self._id = cpp.generateIdentity(name.encode(), ca._id, bits)
property PublicKey:
def __get__(self):
k = PublicKey()
k._key = self._id.first.get().getPublicKey()
return k
property Certificate:
def __get__(self):
c = Certificate()
c._cert = self._id.second
return c
cdef class DhtConfig(object):
cdef cpp.Config _config
def __init__(self):
self._config = cpp.Config()
self._config.threaded = True;
def setIdentity(self, Identity id):
self._config.dht_config.id = id._id
def setBootstrapMode(self, bool bootstrap):
self._config.dht_config.node_config.is_bootstrap = bootstrap
def setNodeId(self, InfoHash id):
self._config.dht_config.node_config.node_id = id._infohash
cdef class DhtRunner(_WithID):
cdef cpp.DhtRunner* thisptr
def __cinit__(self):
self.thisptr = new cpp.DhtRunner()
def getId(self):
h = InfoHash()
h._infohash = self.thisptr.getId()
return h
def getNodeId(self):
return self.thisptr.getNodeId().toString()
def bootstrap(self, str host, str port):
self.thisptr.bootstrap(host.encode(), port.encode())
def run(self, Identity id=None, is_bootstrap=False, cpp.in_port_t port=0, str ipv4="", str ipv6="", DhtConfig config=DhtConfig()):
if id:
config.setIdentity(id)
if ipv4 or ipv6:
bind4 = ipv4.encode() if ipv4 else b''
bind6 = ipv6.encode() if ipv6 else b''
self.thisptr.run(bind4, bind6, str(port).encode(), config._config)
else:
self.thisptr.run(port, config._config)
def join(self):
self.thisptr.join()
def shutdown(self, shutdown_cb=None):
cb_obj = {'shutdown':shutdown_cb}
ref.Py_INCREF(cb_obj)
self.thisptr.shutdown(cpp.Dht.bindShutdownCb(shutdown_callback, <void*>cb_obj))
def isRunning(self):
return self.thisptr.isRunning()
def getStorageLog(self):
return self.thisptr.getStorageLog().decode()
def getRoutingTablesLog(self, cpp.sa_family_t af):
return self.thisptr.getRoutingTablesLog(af).decode()
def getSearchesLog(self, cpp.sa_family_t af):
return self.thisptr.getSearchesLog(af).decode()
def getNodeMessageStats(self):
stats = []
cdef cpp.vector[unsigned] res = self.thisptr.getNodeMessageStats(False)
for n in res:
stats.append(n)
return stats
def get(self, InfoHash key, get_cb=None, done_cb=None):
"""Retreive values associated with a key on the DHT.
key -- the key for which to search
get_cb -- is set, makes the operation non-blocking. Called when a value is found on the DHT.
done_cb -- optional callback used when get_cb is set. Called when the operation is completed.
"""
if get_cb:
cb_obj = {'get':get_cb, 'done':done_cb}
ref.Py_INCREF(cb_obj)
self.thisptr.get(key._infohash, cpp.Dht.bindGetCb(get_callback, <void*>cb_obj), cpp.Dht.bindDoneCb(done_callback, <void*>cb_obj))
else:
lock = threading.Condition()
pending = 0
res = []
def tmp_get(v):
nonlocal res
res.append(v)
return True
def tmp_done(ok, nodes):
nonlocal pending, lock
with lock:
pending -= 1
lock.notify()
with lock:
pending += 1
self.get(key, get_cb=tmp_get, done_cb=tmp_done)
while pending > 0:
lock.wait()
return res
def put(self, InfoHash key, Value val, done_cb=None):
"""Publish a new value on the DHT at key.
key -- the DHT key where to put the value
val -- the value to put on the DHT
done_cb -- optional callback called when the operation is completed.
"""
cb_obj = {'done':done_cb}
ref.Py_INCREF(cb_obj)
self.thisptr.put(key._infohash, val._value, cpp.Dht.bindDoneCb(done_callback, <void*>cb_obj))
def listen(self, InfoHash key, get_cb):
t = ListenToken()
t._h = key._infohash
cb_obj = {'get':get_cb}
t._cb['cb'] = cb_obj
# avoid the callback being destructed if the token is destroyed
ref.Py_INCREF(cb_obj)
t._t = self.thisptr.listen(t._h, cpp.Dht.bindGetCb(get_callback, <void*>cb_obj)).share()
return t
def cancelListen(self, ListenToken token):
self.thisptr.cancelListen(token._h, token._t)
ref.Py_DECREF(<object>token._cb['cb'])
# fixme: not thread safe