Files
pubky-core-ffi/bindings/python/pubkycore/pubkycore.py
coreyphillips d45279df02 refactor: update naming convention
Updates naming convention to pubkycore throughout project.
Updated bindings accordingly.
Added python bindings.
2024-10-29 18:41:19 -04:00

1441 lines
52 KiB
Python

# This file was autogenerated by some hot garbage in the `uniffi` crate.
# Trust me, you don't want to mess with it!
# Common helper code.
#
# Ideally this would live in a separate .py file where it can be unittested etc
# in isolation, and perhaps even published as a re-useable package.
#
# However, it's important that the details of how this helper code works (e.g. the
# way that different builtin types are passed across the FFI) exactly match what's
# expected by the rust code on the other side of the interface. In practice right
# now that means coming from the exact some version of `uniffi` that was used to
# compile the rust component. The easiest way to ensure this is to bundle the Python
# helpers directly inline like we're doing here.
import os
import sys
import ctypes
import enum
import struct
import contextlib
import datetime
import typing
import platform
# Used for default argument values
_DEFAULT = object()
class _UniffiRustBuffer(ctypes.Structure):
_fields_ = [
("capacity", ctypes.c_int32),
("len", ctypes.c_int32),
("data", ctypes.POINTER(ctypes.c_char)),
]
@staticmethod
def alloc(size):
return _rust_call(_UniffiLib.ffi_pubkycore_rustbuffer_alloc, size)
@staticmethod
def reserve(rbuf, additional):
return _rust_call(_UniffiLib.ffi_pubkycore_rustbuffer_reserve, rbuf, additional)
def free(self):
return _rust_call(_UniffiLib.ffi_pubkycore_rustbuffer_free, self)
def __str__(self):
return "_UniffiRustBuffer(capacity={}, len={}, data={})".format(
self.capacity,
self.len,
self.data[0:self.len]
)
@contextlib.contextmanager
def alloc_with_builder(*args):
"""Context-manger to allocate a buffer using a _UniffiRustBufferBuilder.
The allocated buffer will be automatically freed if an error occurs, ensuring that
we don't accidentally leak it.
"""
builder = _UniffiRustBufferBuilder()
try:
yield builder
except:
builder.discard()
raise
@contextlib.contextmanager
def consume_with_stream(self):
"""Context-manager to consume a buffer using a _UniffiRustBufferStream.
The _UniffiRustBuffer will be freed once the context-manager exits, ensuring that we don't
leak it even if an error occurs.
"""
try:
s = _UniffiRustBufferStream.from_rust_buffer(self)
yield s
if s.remaining() != 0:
raise RuntimeError("junk data left in buffer at end of consume_with_stream")
finally:
self.free()
@contextlib.contextmanager
def read_with_stream(self):
"""Context-manager to read a buffer using a _UniffiRustBufferStream.
This is like consume_with_stream, but doesn't free the buffer afterwards.
It should only be used with borrowed `_UniffiRustBuffer` data.
"""
s = _UniffiRustBufferStream.from_rust_buffer(self)
yield s
if s.remaining() != 0:
raise RuntimeError("junk data left in buffer at end of read_with_stream")
class _UniffiForeignBytes(ctypes.Structure):
_fields_ = [
("len", ctypes.c_int32),
("data", ctypes.POINTER(ctypes.c_char)),
]
def __str__(self):
return "_UniffiForeignBytes(len={}, data={})".format(self.len, self.data[0:self.len])
class _UniffiRustBufferStream:
"""
Helper for structured reading of bytes from a _UniffiRustBuffer
"""
def __init__(self, data, len):
self.data = data
self.len = len
self.offset = 0
@classmethod
def from_rust_buffer(cls, buf):
return cls(buf.data, buf.len)
def remaining(self):
return self.len - self.offset
def _unpack_from(self, size, format):
if self.offset + size > self.len:
raise InternalError("read past end of rust buffer")
value = struct.unpack(format, self.data[self.offset:self.offset+size])[0]
self.offset += size
return value
def read(self, size):
if self.offset + size > self.len:
raise InternalError("read past end of rust buffer")
data = self.data[self.offset:self.offset+size]
self.offset += size
return data
def read_i8(self):
return self._unpack_from(1, ">b")
def read_u8(self):
return self._unpack_from(1, ">B")
def read_i16(self):
return self._unpack_from(2, ">h")
def read_u16(self):
return self._unpack_from(2, ">H")
def read_i32(self):
return self._unpack_from(4, ">i")
def read_u32(self):
return self._unpack_from(4, ">I")
def read_i64(self):
return self._unpack_from(8, ">q")
def read_u64(self):
return self._unpack_from(8, ">Q")
def read_float(self):
v = self._unpack_from(4, ">f")
return v
def read_double(self):
return self._unpack_from(8, ">d")
def read_c_size_t(self):
return self._unpack_from(ctypes.sizeof(ctypes.c_size_t) , "@N")
class _UniffiRustBufferBuilder:
"""
Helper for structured writing of bytes into a _UniffiRustBuffer.
"""
def __init__(self):
self.rbuf = _UniffiRustBuffer.alloc(16)
self.rbuf.len = 0
def finalize(self):
rbuf = self.rbuf
self.rbuf = None
return rbuf
def discard(self):
if self.rbuf is not None:
rbuf = self.finalize()
rbuf.free()
@contextlib.contextmanager
def _reserve(self, num_bytes):
if self.rbuf.len + num_bytes > self.rbuf.capacity:
self.rbuf = _UniffiRustBuffer.reserve(self.rbuf, num_bytes)
yield None
self.rbuf.len += num_bytes
def _pack_into(self, size, format, value):
with self._reserve(size):
# XXX TODO: I feel like I should be able to use `struct.pack_into` here but can't figure it out.
for i, byte in enumerate(struct.pack(format, value)):
self.rbuf.data[self.rbuf.len + i] = byte
def write(self, value):
with self._reserve(len(value)):
for i, byte in enumerate(value):
self.rbuf.data[self.rbuf.len + i] = byte
def write_i8(self, v):
self._pack_into(1, ">b", v)
def write_u8(self, v):
self._pack_into(1, ">B", v)
def write_i16(self, v):
self._pack_into(2, ">h", v)
def write_u16(self, v):
self._pack_into(2, ">H", v)
def write_i32(self, v):
self._pack_into(4, ">i", v)
def write_u32(self, v):
self._pack_into(4, ">I", v)
def write_i64(self, v):
self._pack_into(8, ">q", v)
def write_u64(self, v):
self._pack_into(8, ">Q", v)
def write_float(self, v):
self._pack_into(4, ">f", v)
def write_double(self, v):
self._pack_into(8, ">d", v)
def write_c_size_t(self, v):
self._pack_into(ctypes.sizeof(ctypes.c_size_t) , "@N", v)
# A handful of classes and functions to support the generated data structures.
# This would be a good candidate for isolating in its own ffi-support lib.
class InternalError(Exception):
pass
class _UniffiRustCallStatus(ctypes.Structure):
"""
Error runtime.
"""
_fields_ = [
("code", ctypes.c_int8),
("error_buf", _UniffiRustBuffer),
]
# These match the values from the uniffi::rustcalls module
CALL_SUCCESS = 0
CALL_ERROR = 1
CALL_PANIC = 2
def __str__(self):
if self.code == _UniffiRustCallStatus.CALL_SUCCESS:
return "_UniffiRustCallStatus(CALL_SUCCESS)"
elif self.code == _UniffiRustCallStatus.CALL_ERROR:
return "_UniffiRustCallStatus(CALL_ERROR)"
elif self.code == _UniffiRustCallStatus.CALL_PANIC:
return "_UniffiRustCallStatus(CALL_PANIC)"
else:
return "_UniffiRustCallStatus(<invalid code>)"
def _rust_call(fn, *args):
# Call a rust function
return _rust_call_with_error(None, fn, *args)
def _rust_call_with_error(error_ffi_converter, fn, *args):
# Call a rust function and handle any errors
#
# This function is used for rust calls that return Result<> and therefore can set the CALL_ERROR status code.
# error_ffi_converter must be set to the _UniffiConverter for the error class that corresponds to the result.
call_status = _UniffiRustCallStatus(code=_UniffiRustCallStatus.CALL_SUCCESS, error_buf=_UniffiRustBuffer(0, 0, None))
args_with_error = args + (ctypes.byref(call_status),)
result = fn(*args_with_error)
_uniffi_check_call_status(error_ffi_converter, call_status)
return result
def _uniffi_check_call_status(error_ffi_converter, call_status):
if call_status.code == _UniffiRustCallStatus.CALL_SUCCESS:
pass
elif call_status.code == _UniffiRustCallStatus.CALL_ERROR:
if error_ffi_converter is None:
call_status.error_buf.free()
raise InternalError("_rust_call_with_error: CALL_ERROR, but error_ffi_converter is None")
else:
raise error_ffi_converter.lift(call_status.error_buf)
elif call_status.code == _UniffiRustCallStatus.CALL_PANIC:
# When the rust code sees a panic, it tries to construct a _UniffiRustBuffer
# with the message. But if that code panics, then it just sends back
# an empty buffer.
if call_status.error_buf.len > 0:
msg = _UniffiConverterString.lift(call_status.error_buf)
else:
msg = "Unknown rust panic"
raise InternalError(msg)
else:
raise InternalError("Invalid _UniffiRustCallStatus code: {}".format(
call_status.code))
# A function pointer for a callback as defined by UniFFI.
# Rust definition `fn(handle: u64, method: u32, args: _UniffiRustBuffer, buf_ptr: *mut _UniffiRustBuffer) -> int`
_UNIFFI_FOREIGN_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_ulonglong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_char), ctypes.c_int, ctypes.POINTER(_UniffiRustBuffer))
# UniFFI future continuation
_UNIFFI_FUTURE_CONTINUATION_T = ctypes.CFUNCTYPE(None, ctypes.c_size_t, ctypes.c_int8)
class _UniffiPointerManagerCPython:
"""
Manage giving out pointers to Python objects on CPython
This class is used to generate opaque pointers that reference Python objects to pass to Rust.
It assumes a CPython platform. See _UniffiPointerManagerGeneral for the alternative.
"""
def new_pointer(self, obj):
"""
Get a pointer for an object as a ctypes.c_size_t instance
Each call to new_pointer() must be balanced with exactly one call to release_pointer()
This returns a ctypes.c_size_t. This is always the same size as a pointer and can be
interchanged with pointers for FFI function arguments and return values.
"""
# IncRef the object since we're going to pass a pointer to Rust
ctypes.pythonapi.Py_IncRef(ctypes.py_object(obj))
# id() is the object address on CPython
# (https://docs.python.org/3/library/functions.html#id)
return id(obj)
def release_pointer(self, address):
py_obj = ctypes.cast(address, ctypes.py_object)
obj = py_obj.value
ctypes.pythonapi.Py_DecRef(py_obj)
return obj
def lookup(self, address):
return ctypes.cast(address, ctypes.py_object).value
class _UniffiPointerManagerGeneral:
"""
Manage giving out pointers to Python objects on non-CPython platforms
This has the same API as _UniffiPointerManagerCPython, but doesn't assume we're running on
CPython and is slightly slower.
Instead of using real pointers, it maps integer values to objects and returns the keys as
c_size_t values.
"""
def __init__(self):
self._map = {}
self._lock = threading.Lock()
self._current_handle = 0
def new_pointer(self, obj):
with self._lock:
handle = self._current_handle
self._current_handle += 1
self._map[handle] = obj
return handle
def release_pointer(self, handle):
with self._lock:
return self._map.pop(handle)
def lookup(self, handle):
with self._lock:
return self._map[handle]
# Pick an pointer manager implementation based on the platform
if platform.python_implementation() == 'CPython':
_UniffiPointerManager = _UniffiPointerManagerCPython # type: ignore
else:
_UniffiPointerManager = _UniffiPointerManagerGeneral # type: ignore
# Types conforming to `_UniffiConverterPrimitive` pass themselves directly over the FFI.
class _UniffiConverterPrimitive:
@classmethod
def check(cls, value):
return value
@classmethod
def lift(cls, value):
return value
@classmethod
def lower(cls, value):
return cls.lowerUnchecked(cls.check(value))
@classmethod
def lowerUnchecked(cls, value):
return value
@classmethod
def write(cls, value, buf):
cls.write_unchecked(cls.check(value), buf)
class _UniffiConverterPrimitiveInt(_UniffiConverterPrimitive):
@classmethod
def check(cls, value):
try:
value = value.__index__()
except Exception:
raise TypeError("'{}' object cannot be interpreted as an integer".format(type(value).__name__))
if not isinstance(value, int):
raise TypeError("__index__ returned non-int (type {})".format(type(value).__name__))
if not cls.VALUE_MIN <= value < cls.VALUE_MAX:
raise ValueError("{} requires {} <= value < {}".format(cls.CLASS_NAME, cls.VALUE_MIN, cls.VALUE_MAX))
return super().check(value)
class _UniffiConverterPrimitiveFloat(_UniffiConverterPrimitive):
@classmethod
def check(cls, value):
try:
value = value.__float__()
except Exception:
raise TypeError("must be real number, not {}".format(type(value).__name__))
if not isinstance(value, float):
raise TypeError("__float__ returned non-float (type {})".format(type(value).__name__))
return super().check(value)
# Helper class for wrapper types that will always go through a _UniffiRustBuffer.
# Classes should inherit from this and implement the `read` and `write` static methods.
class _UniffiConverterRustBuffer:
@classmethod
def lift(cls, rbuf):
with rbuf.consume_with_stream() as stream:
return cls.read(stream)
@classmethod
def lower(cls, value):
with _UniffiRustBuffer.alloc_with_builder() as builder:
cls.write(value, builder)
return builder.finalize()
# Contains loading, initialization code, and the FFI Function declarations.
# Define some ctypes FFI types that we use in the library
"""
ctypes type for the foreign executor callback. This is a built-in interface for scheduling
tasks
Args:
executor: opaque c_size_t value representing the eventloop
delay: delay in ms
task: function pointer to the task callback
task_data: void pointer to the task callback data
Normally we should call task(task_data) after the detail.
However, when task is NULL this indicates that Rust has dropped the ForeignExecutor and we should
decrease the EventLoop refcount.
"""
_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int8, ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p)
"""
Function pointer for a Rust task, which a callback function that takes a opaque pointer
"""
_UNIFFI_RUST_TASK = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int8)
def _uniffi_future_callback_t(return_type):
"""
Factory function to create callback function types for async functions
"""
return ctypes.CFUNCTYPE(None, ctypes.c_size_t, return_type, _UniffiRustCallStatus)
def _uniffi_load_indirect():
"""
This is how we find and load the dynamic library provided by the component.
For now we just look it up by name.
"""
if sys.platform == "darwin":
libname = "lib{}.dylib"
elif sys.platform.startswith("win"):
# As of python3.8, ctypes does not seem to search $PATH when loading DLLs.
# We could use `os.add_dll_directory` to configure the search path, but
# it doesn't feel right to mess with application-wide settings. Let's
# assume that the `.dll` is next to the `.py` file and load by full path.
libname = os.path.join(
os.path.dirname(__file__),
"{}.dll",
)
else:
# Anything else must be an ELF platform - Linux, *BSD, Solaris/illumos
libname = "lib{}.so"
libname = libname.format("pubkycore")
path = os.path.join(os.path.dirname(__file__), libname)
lib = ctypes.cdll.LoadLibrary(path)
return lib
def _uniffi_check_contract_api_version(lib):
# Get the bindings contract version from our ComponentInterface
bindings_contract_version = 24
# Get the scaffolding contract version by calling the into the dylib
scaffolding_contract_version = lib.ffi_pubkycore_uniffi_contract_version()
if bindings_contract_version != scaffolding_contract_version:
raise InternalError("UniFFI contract version mismatch: try cleaning and rebuilding your project")
def _uniffi_check_api_checksums(lib):
if lib.uniffi_pubkycore_checksum_func_auth() != 51826:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_create_recovery_file() != 48846:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_decrypt_recovery_file() != 26407:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_delete_file() != 9063:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_generate_secret_key() != 12800:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_get() != 6591:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_get_public_key_from_secret_key() != 40316:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_list() != 43198:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_parse_auth_url() != 27379:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_publish() != 48989:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_publish_https() != 5614:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_put() != 48150:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_remove_event_listener() != 23534:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_resolve() != 34317:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_resolve_https() != 17266:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_session() != 59795:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_set_event_listener() != 60071:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_sign_in() != 21584:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_sign_out() != 34903:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_sign_up() != 37999:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_func_switch_network() != 64215:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
if lib.uniffi_pubkycore_checksum_method_eventlistener_on_event_occurred() != 11531:
raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
# A ctypes library to expose the extern-C FFI definitions.
# This is an implementation detail which will be called internally by the public API.
_UniffiLib = _uniffi_load_indirect()
_UniffiLib.uniffi_pubkycore_fn_free_eventnotifier.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_free_eventnotifier.restype = None
_UniffiLib.uniffi_pubkycore_fn_init_callback_eventlistener.argtypes = (
_UNIFFI_FOREIGN_CALLBACK_T,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_init_callback_eventlistener.restype = None
_UniffiLib.uniffi_pubkycore_fn_func_auth.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_auth.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_create_recovery_file.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_create_recovery_file.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_decrypt_recovery_file.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_decrypt_recovery_file.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_delete_file.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_delete_file.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_generate_secret_key.argtypes = (
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_generate_secret_key.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_get.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_get.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_get_public_key_from_secret_key.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_get_public_key_from_secret_key.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_list.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_list.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_parse_auth_url.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_parse_auth_url.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_publish.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_publish.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_publish_https.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_publish_https.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_put.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_put.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_remove_event_listener.argtypes = (
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_remove_event_listener.restype = None
_UniffiLib.uniffi_pubkycore_fn_func_resolve.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_resolve.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_resolve_https.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_resolve_https.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_session.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_session.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_set_event_listener.argtypes = (
ctypes.c_uint64,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_set_event_listener.restype = None
_UniffiLib.uniffi_pubkycore_fn_func_sign_in.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_sign_in.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_sign_out.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_sign_out.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_sign_up.argtypes = (
_UniffiRustBuffer,
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_sign_up.restype = _UniffiRustBuffer
_UniffiLib.uniffi_pubkycore_fn_func_switch_network.argtypes = (
ctypes.c_int8,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.uniffi_pubkycore_fn_func_switch_network.restype = _UniffiRustBuffer
_UniffiLib.ffi_pubkycore_rustbuffer_alloc.argtypes = (
ctypes.c_int32,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rustbuffer_alloc.restype = _UniffiRustBuffer
_UniffiLib.ffi_pubkycore_rustbuffer_from_bytes.argtypes = (
_UniffiForeignBytes,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rustbuffer_from_bytes.restype = _UniffiRustBuffer
_UniffiLib.ffi_pubkycore_rustbuffer_free.argtypes = (
_UniffiRustBuffer,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rustbuffer_free.restype = None
_UniffiLib.ffi_pubkycore_rustbuffer_reserve.argtypes = (
_UniffiRustBuffer,
ctypes.c_int32,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rustbuffer_reserve.restype = _UniffiRustBuffer
_UniffiLib.ffi_pubkycore_rust_future_continuation_callback_set.argtypes = (
_UNIFFI_FUTURE_CONTINUATION_T,
)
_UniffiLib.ffi_pubkycore_rust_future_continuation_callback_set.restype = None
_UniffiLib.ffi_pubkycore_rust_future_poll_u8.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_u8.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_u8.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_u8.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_u8.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_u8.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_u8.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_u8.restype = ctypes.c_uint8
_UniffiLib.ffi_pubkycore_rust_future_poll_i8.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_i8.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_i8.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_i8.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_i8.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_i8.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_i8.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_i8.restype = ctypes.c_int8
_UniffiLib.ffi_pubkycore_rust_future_poll_u16.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_u16.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_u16.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_u16.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_u16.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_u16.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_u16.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_u16.restype = ctypes.c_uint16
_UniffiLib.ffi_pubkycore_rust_future_poll_i16.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_i16.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_i16.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_i16.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_i16.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_i16.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_i16.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_i16.restype = ctypes.c_int16
_UniffiLib.ffi_pubkycore_rust_future_poll_u32.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_u32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_u32.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_u32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_u32.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_u32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_u32.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_u32.restype = ctypes.c_uint32
_UniffiLib.ffi_pubkycore_rust_future_poll_i32.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_i32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_i32.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_i32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_i32.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_i32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_i32.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_i32.restype = ctypes.c_int32
_UniffiLib.ffi_pubkycore_rust_future_poll_u64.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_u64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_u64.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_u64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_u64.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_u64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_u64.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_u64.restype = ctypes.c_uint64
_UniffiLib.ffi_pubkycore_rust_future_poll_i64.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_i64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_i64.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_i64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_i64.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_i64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_i64.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_i64.restype = ctypes.c_int64
_UniffiLib.ffi_pubkycore_rust_future_poll_f32.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_f32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_f32.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_f32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_f32.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_f32.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_f32.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_f32.restype = ctypes.c_float
_UniffiLib.ffi_pubkycore_rust_future_poll_f64.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_f64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_f64.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_f64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_f64.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_f64.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_f64.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_f64.restype = ctypes.c_double
_UniffiLib.ffi_pubkycore_rust_future_poll_pointer.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_pointer.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_pointer.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_pointer.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_pointer.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_pointer.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_pointer.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_pointer.restype = ctypes.c_void_p
_UniffiLib.ffi_pubkycore_rust_future_poll_rust_buffer.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_rust_buffer.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_rust_buffer.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_rust_buffer.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_rust_buffer.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_rust_buffer.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_rust_buffer.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_rust_buffer.restype = _UniffiRustBuffer
_UniffiLib.ffi_pubkycore_rust_future_poll_void.argtypes = (
ctypes.c_void_p,
ctypes.c_size_t,
)
_UniffiLib.ffi_pubkycore_rust_future_poll_void.restype = None
_UniffiLib.ffi_pubkycore_rust_future_cancel_void.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_cancel_void.restype = None
_UniffiLib.ffi_pubkycore_rust_future_free_void.argtypes = (
ctypes.c_void_p,
)
_UniffiLib.ffi_pubkycore_rust_future_free_void.restype = None
_UniffiLib.ffi_pubkycore_rust_future_complete_void.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(_UniffiRustCallStatus),
)
_UniffiLib.ffi_pubkycore_rust_future_complete_void.restype = None
_UniffiLib.uniffi_pubkycore_checksum_func_auth.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_auth.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_create_recovery_file.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_create_recovery_file.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_decrypt_recovery_file.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_decrypt_recovery_file.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_delete_file.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_delete_file.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_generate_secret_key.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_generate_secret_key.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_get.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_get.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_get_public_key_from_secret_key.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_get_public_key_from_secret_key.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_list.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_list.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_parse_auth_url.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_parse_auth_url.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_publish.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_publish.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_publish_https.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_publish_https.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_put.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_put.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_remove_event_listener.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_remove_event_listener.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_resolve.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_resolve.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_resolve_https.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_resolve_https.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_session.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_session.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_set_event_listener.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_set_event_listener.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_sign_in.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_sign_in.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_sign_out.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_sign_out.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_sign_up.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_sign_up.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_func_switch_network.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_func_switch_network.restype = ctypes.c_uint16
_UniffiLib.uniffi_pubkycore_checksum_method_eventlistener_on_event_occurred.argtypes = (
)
_UniffiLib.uniffi_pubkycore_checksum_method_eventlistener_on_event_occurred.restype = ctypes.c_uint16
_UniffiLib.ffi_pubkycore_uniffi_contract_version.argtypes = (
)
_UniffiLib.ffi_pubkycore_uniffi_contract_version.restype = ctypes.c_uint32
_uniffi_check_contract_api_version(_UniffiLib)
_uniffi_check_api_checksums(_UniffiLib)
# Async support
# Public interface members begin here.
class _UniffiConverterBool(_UniffiConverterPrimitive):
@classmethod
def check(cls, value):
return not not value
@classmethod
def read(cls, buf):
return cls.lift(buf.read_u8())
@classmethod
def write_unchecked(cls, value, buf):
buf.write_u8(value)
@staticmethod
def lift(value):
return value != 0
class _UniffiConverterString:
@staticmethod
def check(value):
if not isinstance(value, str):
raise TypeError("argument must be str, not {}".format(type(value).__name__))
return value
@staticmethod
def read(buf):
size = buf.read_i32()
if size < 0:
raise InternalError("Unexpected negative string length")
utf8_bytes = buf.read(size)
return utf8_bytes.decode("utf-8")
@staticmethod
def write(value, buf):
value = _UniffiConverterString.check(value)
utf8_bytes = value.encode("utf-8")
buf.write_i32(len(utf8_bytes))
buf.write(utf8_bytes)
@staticmethod
def lift(buf):
with buf.consume_with_stream() as stream:
return stream.read(stream.remaining()).decode("utf-8")
@staticmethod
def lower(value):
value = _UniffiConverterString.check(value)
with _UniffiRustBuffer.alloc_with_builder() as builder:
builder.write(value.encode("utf-8"))
return builder.finalize()
class EventNotifier:
_pointer: ctypes.c_void_p
def __del__(self):
# In case of partial initialization of instances.
pointer = getattr(self, "_pointer", None)
if pointer is not None:
_rust_call(_UniffiLib.uniffi_pubkycore_fn_free_eventnotifier, pointer)
# Used by alternative constructors or any methods which return this type.
@classmethod
def _make_instance_(cls, pointer):
# Lightly yucky way to bypass the usual __init__ logic
# and just create a new instance with the required pointer.
inst = cls.__new__(cls)
inst._pointer = pointer
return inst
class _UniffiConverterTypeEventNotifier:
@classmethod
def read(cls, buf):
ptr = buf.read_u64()
if ptr == 0:
raise InternalError("Raw pointer value was null")
return cls.lift(ptr)
@classmethod
def write(cls, value, buf):
if not isinstance(value, EventNotifier):
raise TypeError("Expected EventNotifier instance, {} found".format(type(value).__name__))
buf.write_u64(cls.lower(value))
@staticmethod
def lift(value):
return EventNotifier._make_instance_(value)
@staticmethod
def lower(value):
return value._pointer
import threading
class ConcurrentHandleMap:
"""
A map where inserting, getting and removing data is synchronized with a lock.
"""
def __init__(self):
# type Handle = int
self._left_map = {} # type: Dict[Handle, Any]
self._right_map = {} # type: Dict[Any, Handle]
self._lock = threading.Lock()
self._current_handle = 0
self._stride = 1
def insert(self, obj):
with self._lock:
if obj in self._right_map:
return self._right_map[obj]
else:
handle = self._current_handle
self._current_handle += self._stride
self._left_map[handle] = obj
self._right_map[obj] = handle
return handle
def get(self, handle):
with self._lock:
return self._left_map.get(handle)
def remove(self, handle):
with self._lock:
if handle in self._left_map:
obj = self._left_map.pop(handle)
del self._right_map[obj]
return obj
# Magic number for the Rust proxy to call using the same mechanism as every other method,
# to free the callback once it's dropped by Rust.
IDX_CALLBACK_FREE = 0
# Return codes for callback calls
_UNIFFI_CALLBACK_SUCCESS = 0
_UNIFFI_CALLBACK_ERROR = 1
_UNIFFI_CALLBACK_UNEXPECTED_ERROR = 2
class _UniffiConverterCallbackInterface:
_handle_map = ConcurrentHandleMap()
def __init__(self, cb):
self._foreign_callback = cb
def drop(self, handle):
self.__class__._handle_map.remove(handle)
@classmethod
def lift(cls, handle):
obj = cls._handle_map.get(handle)
if not obj:
raise InternalError("The object in the handle map has been dropped already")
return obj
@classmethod
def read(cls, buf):
handle = buf.read_u64()
cls.lift(handle)
@classmethod
def lower(cls, cb):
handle = cls._handle_map.insert(cb)
return handle
@classmethod
def write(cls, cb, buf):
buf.write_u64(cls.lower(cb))
# Declaration and _UniffiConverters for EventListener Callback Interface
class EventListener:
def on_event_occurred(self, event_data: "str"):
raise NotImplementedError
def py_foreignCallbackCallbackInterfaceEventListener(handle, method, args_data, args_len, buf_ptr):
def invoke_on_event_occurred(python_callback, args_stream, buf_ptr):
def makeCall():return python_callback.on_event_occurred(
_UniffiConverterString.read(args_stream)
)
def makeCallAndHandleReturn():
makeCall()
return _UNIFFI_CALLBACK_SUCCESS
return makeCallAndHandleReturn()
cb = _UniffiConverterCallbackInterfaceEventListener.lift(handle)
if not cb:
raise InternalError("No callback in handlemap; this is a uniffi bug")
if method == IDX_CALLBACK_FREE:
_UniffiConverterCallbackInterfaceEventListener.drop(handle)
# Successfull return
# See docs of ForeignCallback in `uniffi_core/src/ffi/foreigncallbacks.rs`
return _UNIFFI_CALLBACK_SUCCESS
if method == 1:
# Call the method and handle any errors
# See docs of ForeignCallback in `uniffi_core/src/ffi/foreigncallbacks.rs` for details
try:
return invoke_on_event_occurred(cb, _UniffiRustBufferStream(args_data, args_len), buf_ptr)
except BaseException as e:
# Catch unexpected errors
try:
# Try to serialize the exception into a String
buf_ptr[0] = _UniffiConverterString.lower(repr(e))
except:
# If that fails, just give up
pass
return _UNIFFI_CALLBACK_UNEXPECTED_ERROR
# This should never happen, because an out of bounds method index won't
# ever be used. Once we can catch errors, we should return an InternalException.
# https://github.com/mozilla/uniffi-rs/issues/351
# An unexpected error happened.
# See docs of ForeignCallback in `uniffi_core/src/ffi/foreigncallbacks.rs`
return _UNIFFI_CALLBACK_UNEXPECTED_ERROR
# We need to keep this function reference alive:
# if they get GC'd while in use then UniFFI internals could attempt to call a function
# that is in freed memory.
# That would be...uh...bad. Yeah, that's the word. Bad.
foreignCallbackCallbackInterfaceEventListener = _UNIFFI_FOREIGN_CALLBACK_T(py_foreignCallbackCallbackInterfaceEventListener)
_rust_call(lambda err: _UniffiLib.uniffi_pubkycore_fn_init_callback_eventlistener(foreignCallbackCallbackInterfaceEventListener, err))
# The _UniffiConverter which transforms the Callbacks in to Handles to pass to Rust.
_UniffiConverterCallbackInterfaceEventListener = _UniffiConverterCallbackInterface(foreignCallbackCallbackInterfaceEventListener)
class _UniffiConverterSequenceString(_UniffiConverterRustBuffer):
@classmethod
def write(cls, value, buf):
items = len(value)
buf.write_i32(items)
for item in value:
_UniffiConverterString.write(item, buf)
@classmethod
def read(cls, buf):
count = buf.read_i32()
if count < 0:
raise InternalError("Unexpected negative sequence length")
return [
_UniffiConverterString.read(buf) for i in range(count)
]
def auth(url: "str",secret_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_auth,
_UniffiConverterString.lower(url),
_UniffiConverterString.lower(secret_key)))
def create_recovery_file(secret_key: "str",passphrase: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_create_recovery_file,
_UniffiConverterString.lower(secret_key),
_UniffiConverterString.lower(passphrase)))
def decrypt_recovery_file(recovery_file: "str",passphrase: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_decrypt_recovery_file,
_UniffiConverterString.lower(recovery_file),
_UniffiConverterString.lower(passphrase)))
def delete_file(url: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_delete_file,
_UniffiConverterString.lower(url)))
def generate_secret_key() -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_generate_secret_key,))
def get(url: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_get,
_UniffiConverterString.lower(url)))
def get_public_key_from_secret_key(secret_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_get_public_key_from_secret_key,
_UniffiConverterString.lower(secret_key)))
def list(url: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_list,
_UniffiConverterString.lower(url)))
def parse_auth_url(url: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_parse_auth_url,
_UniffiConverterString.lower(url)))
def publish(record_name: "str",record_content: "str",secret_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_publish,
_UniffiConverterString.lower(record_name),
_UniffiConverterString.lower(record_content),
_UniffiConverterString.lower(secret_key)))
def publish_https(record_name: "str",target: "str",secret_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_publish_https,
_UniffiConverterString.lower(record_name),
_UniffiConverterString.lower(target),
_UniffiConverterString.lower(secret_key)))
def put(url: "str",content: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_put,
_UniffiConverterString.lower(url),
_UniffiConverterString.lower(content)))
def remove_event_listener():
_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_remove_event_listener,)
def resolve(public_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_resolve,
_UniffiConverterString.lower(public_key)))
def resolve_https(public_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_resolve_https,
_UniffiConverterString.lower(public_key)))
def session(pubky: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_session,
_UniffiConverterString.lower(pubky)))
def set_event_listener(listener: "EventListener"):
_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_set_event_listener,
_UniffiConverterCallbackInterfaceEventListener.lower(listener))
def sign_in(secret_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_sign_in,
_UniffiConverterString.lower(secret_key)))
def sign_out(secret_key: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_sign_out,
_UniffiConverterString.lower(secret_key)))
def sign_up(secret_key: "str",homeserver: "str") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_sign_up,
_UniffiConverterString.lower(secret_key),
_UniffiConverterString.lower(homeserver)))
def switch_network(use_testnet: "bool") -> "typing.List[str]":
return _UniffiConverterSequenceString.lift(_rust_call(_UniffiLib.uniffi_pubkycore_fn_func_switch_network,
_UniffiConverterBool.lower(use_testnet)))
__all__ = [
"InternalError",
"auth",
"create_recovery_file",
"decrypt_recovery_file",
"delete_file",
"generate_secret_key",
"get",
"get_public_key_from_secret_key",
"list",
"parse_auth_url",
"publish",
"publish_https",
"put",
"remove_event_listener",
"resolve",
"resolve_https",
"session",
"set_event_listener",
"sign_in",
"sign_out",
"sign_up",
"switch_network",
"EventNotifier",
"EventListener",
]