Package hyveos_sdk

Sub-modules

hyveos_sdk.connection
hyveos_sdk.protocol
hyveos_sdk.services

Classes

class AppsService (conn: grpc.aio._base_channel.Channel)
Expand source code
class AppsService:
    """
    A handle to the application management service.

    Exposes methods to interact with the application management service,
    like for deploying and stopping apps on peers in the network.
    """

    def __init__(self, conn: Channel):
        self.stub = AppsStub(conn)
        self.empty = Empty()

    async def deploy(
        self,
        image: str,
        local: bool,
        ports: Iterable[int] = [],
        peer_id: Optional[str] = None,
    ) -> str:
        """
        Deploys an application in a docker image to a peer in the network.

        Returns the ULID of the deployed application.

        To deploy to self, leave the peer_id argument empty.

        Parameters
        ----------
        image : str
            A docker image name, can contain a tag, e.g. `my-docker-image:latest`
        local : bool
            Whether the image is available locally, without needing to pull it from a registry
        ports : Iterable[int], optional
            Ports to expose on the container (default: [])
        peer_id : str, optional
            The peer_id of the target node or None to deploy to self (default: None)

        Returns
        -------
        app_id : str
            The id of the deployed application
        """

        if peer_id is not None:
            peer = Peer(peer_id=peer_id)
        else:
            peer = None

        id = await self.stub.Deploy(
            DeployAppRequest(
                app=DockerApp(image=DockerImage(name=image), ports=ports),
                local=local,
                peer=peer,
            )
        )
        return id.ulid

    async def list_running(self, peer_id: Optional[str] = None) -> Iterable[RunningApp]:
        """
        Lists the running apps on a peer in the network.

        To list the running apps on self, leave the peer_id argument empty.

        Parameters
        ----------
        peer_id : str, optional
            The peer_id of the target node or None to list apps on self (default: None)

        Returns
        -------
        app_ids : Iterable[str]
            The ids of the running applications
        """

        if peer_id is not None:
            peer = Peer(peer_id=peer_id)
        else:
            peer = None

        response = await self.stub.ListRunning(ListRunningAppsRequest(peer=peer))
        return response.apps

    async def stop(self, app_id: str, peer_id: Optional[str] = None):
        """
        Stops a running app with an ID on a peer in the network.

        To stop the running app on self, leave the peer_id argument empty.

        Parameters
        ----------
        app_id : str
            The id of the app to stop
        peer_id : str, optional
            The peer_id of the target node or None to stop the app on self (default: None)
        """

        if peer_id is not None:
            peer = Peer(peer_id=peer_id)
        else:
            peer = None

        await self.stub.Stop(StopAppRequest(id=ID(ulid=app_id), peer=peer))

    async def get_own_app_id(self) -> str:
        """
        Get the ID of the current app.

        This can only be called from a running app.

        Returns
        -------
        app_id : str
            The id of the current app
        """

        id = await self.stub.GetOwnAppId(self.empty)
        return id.ulid

A handle to the application management service.

Exposes methods to interact with the application management service, like for deploying and stopping apps on peers in the network.

Methods

async def deploy(self,
image: str,
local: bool,
ports: Iterable[int] = [],
peer_id: str | None = None) ‑> str
Expand source code
async def deploy(
    self,
    image: str,
    local: bool,
    ports: Iterable[int] = [],
    peer_id: Optional[str] = None,
) -> str:
    """
    Deploys an application in a docker image to a peer in the network.

    Returns the ULID of the deployed application.

    To deploy to self, leave the peer_id argument empty.

    Parameters
    ----------
    image : str
        A docker image name, can contain a tag, e.g. `my-docker-image:latest`
    local : bool
        Whether the image is available locally, without needing to pull it from a registry
    ports : Iterable[int], optional
        Ports to expose on the container (default: [])
    peer_id : str, optional
        The peer_id of the target node or None to deploy to self (default: None)

    Returns
    -------
    app_id : str
        The id of the deployed application
    """

    if peer_id is not None:
        peer = Peer(peer_id=peer_id)
    else:
        peer = None

    id = await self.stub.Deploy(
        DeployAppRequest(
            app=DockerApp(image=DockerImage(name=image), ports=ports),
            local=local,
            peer=peer,
        )
    )
    return id.ulid

Deploys an application in a docker image to a peer in the network.

Returns the ULID of the deployed application.

To deploy to self, leave the peer_id argument empty.

Parameters

image : str
A docker image name, can contain a tag, e.g. my-docker-image:latest
local : bool
Whether the image is available locally, without needing to pull it from a registry
ports : Iterable[int], optional
Ports to expose on the container (default: [])
peer_id : str, optional
The peer_id of the target node or None to deploy to self (default: None)

Returns

app_id : str
The id of the deployed application
async def get_own_app_id(self) ‑> str
Expand source code
async def get_own_app_id(self) -> str:
    """
    Get the ID of the current app.

    This can only be called from a running app.

    Returns
    -------
    app_id : str
        The id of the current app
    """

    id = await self.stub.GetOwnAppId(self.empty)
    return id.ulid

Get the ID of the current app.

This can only be called from a running app.

Returns

app_id : str
The id of the current app
async def list_running(self, peer_id: str | None = None) ‑> Iterable[bridge_pb2.RunningApp]
Expand source code
async def list_running(self, peer_id: Optional[str] = None) -> Iterable[RunningApp]:
    """
    Lists the running apps on a peer in the network.

    To list the running apps on self, leave the peer_id argument empty.

    Parameters
    ----------
    peer_id : str, optional
        The peer_id of the target node or None to list apps on self (default: None)

    Returns
    -------
    app_ids : Iterable[str]
        The ids of the running applications
    """

    if peer_id is not None:
        peer = Peer(peer_id=peer_id)
    else:
        peer = None

    response = await self.stub.ListRunning(ListRunningAppsRequest(peer=peer))
    return response.apps

Lists the running apps on a peer in the network.

To list the running apps on self, leave the peer_id argument empty.

Parameters

peer_id : str, optional
The peer_id of the target node or None to list apps on self (default: None)

Returns

app_ids : Iterable[str]
The ids of the running applications
async def stop(self, app_id: str, peer_id: str | None = None)
Expand source code
async def stop(self, app_id: str, peer_id: Optional[str] = None):
    """
    Stops a running app with an ID on a peer in the network.

    To stop the running app on self, leave the peer_id argument empty.

    Parameters
    ----------
    app_id : str
        The id of the app to stop
    peer_id : str, optional
        The peer_id of the target node or None to stop the app on self (default: None)
    """

    if peer_id is not None:
        peer = Peer(peer_id=peer_id)
    else:
        peer = None

    await self.stub.Stop(StopAppRequest(id=ID(ulid=app_id), peer=peer))

Stops a running app with an ID on a peer in the network.

To stop the running app on self, leave the peer_id argument empty.

Parameters

app_id : str
The id of the app to stop
peer_id : str, optional
The peer_id of the target node or None to stop the app on self (default: None)
class Connection (socket_path: pathlib.Path | str | None = None,
shared_dir_path: pathlib.Path | str | None = None,
uri: str | None = None)
Expand source code
class Connection:
    """
    A connection to the hyveOS runtime.

    This class is used to establish a connection to the HyveOS runtime.
    It is used as a context manager to ensure that the connection is properly closed when it is no longer needed.

    By default, the connection to the HyveOS runtime will be made through the application bridge,
    i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable will be used to communicate with the runtime.

    If another connection type is desired, you can specify either the `socket_path` and `shared_dir_path` parameters,
    or the `uri` parameter when creating the connection.

    Example
    -------

    ```python
    from hyveos_sdk import Connection

    async def main():
        async with Connection() as conn:
            peer_id = await conn.get_id()

            print(f'My peer ID: {peer_id}')
    ```
    """

    _conn: grpc.aio.Channel
    _shared_dir_path: Optional[Path]
    _uri: Optional[str]
    _session: Optional[aiohttp.ClientSession]

    def __init__(
        self,
        socket_path: Optional[Path | str] = None,
        shared_dir_path: Optional[Path | str] = None,
        uri: Optional[str] = None,
    ):
        """
        Establishes a connection to the HyveOS runtime.

        By default, the connection to the HyveOS runtime will be made through the application bridge,
        i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable will be used to communicate with the runtime.

        If another connection type is desired, you can specify either the `socket_path` and `shared_dir_path` parameters,
        or the `uri` parameter.

        Parameters
        ----------
        socket_path : Path | str, optional
            A custom path to a Unix domain socket to connect to.
            The socket path should point to a Unix domain socket that the HyveOS runtime is listening on.

            Mutually exclusive with `uri`. If `socket_path` is provided, `shared_dir_path` must also be provided.
        shared_dir_path : Path | str, optional
            A path to a directory where the runtime expects files provided with the file-transfer service to be stored.

            Mutually exclusive with `uri`. Must be provided if `socket_path` is provided.
        uri : str, optional
            A URI to connect to over the network.
            The URI should be in the format `http://<host>:<port>`.
            A HyveOS runtime should be listening at the given address.

            Mutually exclusive with `socket_path` and `shared_dir_path`.

        Raises
        ------
        ValueError
            If both `socket_path` and `uri` are provided.
        """

        shared_dir_path = (
            Path(shared_dir_path)
            if isinstance(shared_dir_path, str)
            else shared_dir_path
        )
        self._shared_dir_path = (
            shared_dir_path.resolve(strict=True)
            if shared_dir_path is not None
            else None
        )
        self._uri = uri
        self._session = None

        if socket_path is not None:
            if shared_dir_path is None:
                raise ValueError(
                    '`shared_dir_path` must be provided when `socket_path` is provided'
                )
            if uri is not None:
                raise ValueError(
                    'Only one of `socket_path` and `shared_dir_path`, or `uri` can be provided'
                )
            self._conn = grpc.aio.insecure_channel(
                f'unix://{socket_path}',
                options=(('grpc.default_authority', 'localhost'),),
            )
        elif uri is not None:
            if shared_dir_path is not None:
                raise ValueError(
                    '`shared_dir_path` cannot be provided when `uri` is provided'
                )
            self._conn = grpc.aio.insecure_channel(uri)
            self._session = aiohttp.ClientSession()
        elif shared_dir_path is not None:
            raise ValueError(
                '`shared_dir_path` cannot be provided without `socket_path`'
            )
        else:
            bridge_socket_path = os.environ['HYVEOS_BRIDGE_SOCKET']
            self._conn = grpc.aio.insecure_channel(
                f'unix://{bridge_socket_path}',
                options=(('grpc.default_authority', 'localhost'),),
            )

    async def __aenter__(self) -> 'OpenedConnection':
        return OpenedConnection(self)

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._conn.close()

        if self._session is not None:
            await self._session.close()

A connection to the hyveOS runtime.

This class is used to establish a connection to the HyveOS runtime. It is used as a context manager to ensure that the connection is properly closed when it is no longer needed.

By default, the connection to the HyveOS runtime will be made through the application bridge, i.e., the Unix domain socket specified by the HYVEOS_BRIDGE_SOCKET environment variable will be used to communicate with the runtime.

If another connection type is desired, you can specify either the socket_path and shared_dir_path parameters, or the uri parameter when creating the connection.

Example

from hyveos_sdk import Connection

async def main():
    async with Connection() as conn:
        peer_id = await conn.get_id()

        print(f'My peer ID: {peer_id}')

Establishes a connection to the HyveOS runtime.

By default, the connection to the HyveOS runtime will be made through the application bridge, i.e., the Unix domain socket specified by the HYVEOS_BRIDGE_SOCKET environment variable will be used to communicate with the runtime.

If another connection type is desired, you can specify either the socket_path and shared_dir_path parameters, or the uri parameter.

Parameters

socket_path : Path | str, optional

A custom path to a Unix domain socket to connect to. The socket path should point to a Unix domain socket that the HyveOS runtime is listening on.

Mutually exclusive with uri. If socket_path is provided, shared_dir_path must also be provided.

shared_dir_path : Path | str, optional

A path to a directory where the runtime expects files provided with the file-transfer service to be stored.

Mutually exclusive with uri. Must be provided if socket_path is provided.

uri : str, optional

A URI to connect to over the network. The URI should be in the format http://<host>:<port>. A HyveOS runtime should be listening at the given address.

Mutually exclusive with socket_path and shared_dir_path.

Raises

ValueError
If both socket_path and uri are provided.
class DebugService (conn: grpc.aio._base_channel.Channel)
Expand source code
class DebugService:
    """
    A handle to the debug service.

    Exposes methods to interact with the debug service,
    such as subscribing to mesh topology events and message debug events.
    """

    def __init__(self, conn: Channel):
        self.stub = DebugStub(conn)

    def subscribe_mesh_topology(self) -> ManagedStream[MeshTopologyEvent]:
        """
        Subscribes to mesh topology events.

        Returns a stream of mesh topology events. The stream will emit an event whenever the mesh
        topology changes.

        For each peer in the mesh, it is guaranteed that the stream will first emit an `init` event
        when it enters the mesh, followed only by `discovered` and `lost` events,
        until the peer leaves the mesh.

        Returns
        -------
        stream : ManagedStream[MeshTopologyEvent]
            A stream of mesh topology events
        """
        stream = self.stub.SubscribeMeshTopology(Empty())
        return ManagedStream(stream)

    def subscribe_messages(self) -> ManagedStream[MessageDebugEvent]:
        """
        Subscribes to message debug events.

        Returns a stream of mesh debug events. The stream will emit an event whenever a request,
        response, or gossipsub message is sent by a peer in the mesh.

        Returns
        -------
        stream : ManagedStream[MessageDebugEvent]
            A stream of message debug events
        """
        stream = self.stub.SubscribeMessages(Empty())
        return ManagedStream(stream)

A handle to the debug service.

Exposes methods to interact with the debug service, such as subscribing to mesh topology events and message debug events.

Methods

def subscribe_mesh_topology(self) ‑> ManagedStream[bridge_pb2.MeshTopologyEvent]
Expand source code
def subscribe_mesh_topology(self) -> ManagedStream[MeshTopologyEvent]:
    """
    Subscribes to mesh topology events.

    Returns a stream of mesh topology events. The stream will emit an event whenever the mesh
    topology changes.

    For each peer in the mesh, it is guaranteed that the stream will first emit an `init` event
    when it enters the mesh, followed only by `discovered` and `lost` events,
    until the peer leaves the mesh.

    Returns
    -------
    stream : ManagedStream[MeshTopologyEvent]
        A stream of mesh topology events
    """
    stream = self.stub.SubscribeMeshTopology(Empty())
    return ManagedStream(stream)

Subscribes to mesh topology events.

Returns a stream of mesh topology events. The stream will emit an event whenever the mesh topology changes.

For each peer in the mesh, it is guaranteed that the stream will first emit an init event when it enters the mesh, followed only by discovered and lost events, until the peer leaves the mesh.

Returns

stream : ManagedStream[MeshTopologyEvent]
A stream of mesh topology events
def subscribe_messages(self) ‑> ManagedStream[bridge_pb2.MessageDebugEvent]
Expand source code
def subscribe_messages(self) -> ManagedStream[MessageDebugEvent]:
    """
    Subscribes to message debug events.

    Returns a stream of mesh debug events. The stream will emit an event whenever a request,
    response, or gossipsub message is sent by a peer in the mesh.

    Returns
    -------
    stream : ManagedStream[MessageDebugEvent]
        A stream of message debug events
    """
    stream = self.stub.SubscribeMessages(Empty())
    return ManagedStream(stream)

Subscribes to message debug events.

Returns a stream of mesh debug events. The stream will emit an event whenever a request, response, or gossipsub message is sent by a peer in the mesh.

Returns

stream : ManagedStream[MessageDebugEvent]
A stream of message debug events
class DiscoveryService (conn: grpc.aio._base_channel.Channel)
Expand source code
class DiscoveryService:
    """
    A handle to the discovery service.

    Exposes methods to interact with the discovery service,
    like for marking the local runtime as a provider for a discovery key
    or getting the providers for a discovery key.
    """

    def __init__(self, conn: Channel):
        self.stub = DiscoveryStub(conn)

    async def provide(self, topic: str, key: str | bytes) -> None:
        """
        Marks the local runtime as a provider for a discovery key.
        """
        await self.stub.Provide(DHTKey(topic=Topic(topic=topic), key=enc(key)))

    def get_providers(self, topic: str, key: str | bytes) -> ManagedStream[Peer]:
        """
        Gets the providers for a discovery key.

        Returns
        -------
        stream : ManagedStream[Peer]
            A stream of providers for the discovery key.
        """
        stream = self.stub.GetProviders(DHTKey(topic=Topic(topic=topic), key=enc(key)))
        return ManagedStream(stream)

    async def stop_providing(self, topic: str, key: str | bytes) -> None:
        """
        Stops providing a discovery key.
        """
        await self.stub.StopProviding(DHTKey(topic=Topic(topic=topic), key=enc(key)))

A handle to the discovery service.

Exposes methods to interact with the discovery service, like for marking the local runtime as a provider for a discovery key or getting the providers for a discovery key.

Methods

def get_providers(self, topic: str, key: str | bytes) ‑> ManagedStream[bridge_pb2.Peer]
Expand source code
def get_providers(self, topic: str, key: str | bytes) -> ManagedStream[Peer]:
    """
    Gets the providers for a discovery key.

    Returns
    -------
    stream : ManagedStream[Peer]
        A stream of providers for the discovery key.
    """
    stream = self.stub.GetProviders(DHTKey(topic=Topic(topic=topic), key=enc(key)))
    return ManagedStream(stream)

Gets the providers for a discovery key.

Returns

stream : ManagedStream[Peer]
A stream of providers for the discovery key.
async def provide(self, topic: str, key: str | bytes) ‑> None
Expand source code
async def provide(self, topic: str, key: str | bytes) -> None:
    """
    Marks the local runtime as a provider for a discovery key.
    """
    await self.stub.Provide(DHTKey(topic=Topic(topic=topic), key=enc(key)))

Marks the local runtime as a provider for a discovery key.

async def stop_providing(self, topic: str, key: str | bytes) ‑> None
Expand source code
async def stop_providing(self, topic: str, key: str | bytes) -> None:
    """
    Stops providing a discovery key.
    """
    await self.stub.StopProviding(DHTKey(topic=Topic(topic=topic), key=enc(key)))

Stops providing a discovery key.

class FileTransferService
Expand source code
class FileTransferService(ABC):
    """
    A handle to the file transfer service.

    Exposes methods to interact with the file transfer service, like for publishing and getting files.
    """

    @abstractmethod
    async def publish(self, file_path: Path | str) -> FileToken:
        """
        Publishes a file in the mesh network and returns its content ID.

        Before it's published, the file is copied to the shared directory if it is not already
        there. By default, the shared directory is defined by the `HYVEOS_BRIDGE_SHARED_DIR`
        environment variable. However, it can be set to a custom path when initializing the
        connection to the HyveOS runtime.

        Parameters
        ----------
        file_path : Path | str
            The local path to the file to publish

        Returns
        -------
        file_token : FileToken
            The content ID of the published file
        """
        pass

    @abstractmethod
    async def get(self, file_token: FileToken) -> FilePath:
        """
        Retrieves a file from the mesh network and returns its path.

        When the local runtime doesn't own a copy of this file yet, it downloads it from one of its peers.
        Afterwards, or if it was already locally available, the file is copied
        into the shared directory, which is defined by the `HYVEOS_BRIDGE_SHARED_DIR` environment
        variable.

        Parameters
        ----------
        file_token : FileToken
            The content ID of the file to retrieve

        Returns
        -------
        file_path : FilePath
            The local path to the retrieved file
        """
        pass

A handle to the file transfer service.

Exposes methods to interact with the file transfer service, like for publishing and getting files.

Ancestors

  • abc.ABC

Subclasses

Methods

async def get(self,
file_token: FileToken) ‑> bridge_pb2.FilePath
Expand source code
@abstractmethod
async def get(self, file_token: FileToken) -> FilePath:
    """
    Retrieves a file from the mesh network and returns its path.

    When the local runtime doesn't own a copy of this file yet, it downloads it from one of its peers.
    Afterwards, or if it was already locally available, the file is copied
    into the shared directory, which is defined by the `HYVEOS_BRIDGE_SHARED_DIR` environment
    variable.

    Parameters
    ----------
    file_token : FileToken
        The content ID of the file to retrieve

    Returns
    -------
    file_path : FilePath
        The local path to the retrieved file
    """
    pass

Retrieves a file from the mesh network and returns its path.

When the local runtime doesn't own a copy of this file yet, it downloads it from one of its peers. Afterwards, or if it was already locally available, the file is copied into the shared directory, which is defined by the HYVEOS_BRIDGE_SHARED_DIR environment variable.

Parameters

file_token : FileToken
The content ID of the file to retrieve

Returns

file_path : FilePath
The local path to the retrieved file
async def publish(self, file_path: pathlib.Path | str) ‑> FileToken
Expand source code
@abstractmethod
async def publish(self, file_path: Path | str) -> FileToken:
    """
    Publishes a file in the mesh network and returns its content ID.

    Before it's published, the file is copied to the shared directory if it is not already
    there. By default, the shared directory is defined by the `HYVEOS_BRIDGE_SHARED_DIR`
    environment variable. However, it can be set to a custom path when initializing the
    connection to the HyveOS runtime.

    Parameters
    ----------
    file_path : Path | str
        The local path to the file to publish

    Returns
    -------
    file_token : FileToken
        The content ID of the published file
    """
    pass

Publishes a file in the mesh network and returns its content ID.

Before it's published, the file is copied to the shared directory if it is not already there. By default, the shared directory is defined by the HYVEOS_BRIDGE_SHARED_DIR environment variable. However, it can be set to a custom path when initializing the connection to the HyveOS runtime.

Parameters

file_path : Path | str
The local path to the file to publish

Returns

file_token : FileToken
The content ID of the published file
class KVService (conn: grpc.aio._base_channel.Channel)
Expand source code
class KVService:
    """
    A handle to the distributed key-value store service.

    Exposes methods to interact with the key-value store service,
    like for putting records into the key-value store or getting records from it.
    """

    def __init__(self, conn: Channel):
        self.stub = KVStub(conn)

    async def put_record(self, topic: str, key: str | bytes, value: str) -> None:
        """
        Puts a record into the key-value store.

        Parameters
        ----------
        topic : str
            The topic of the record
        key : str | bytes
            The key of the record
        value : str
            The value of the record
        """
        await self.stub.PutRecord(
            DHTRecord(
                key=DHTKey(topic=Topic(topic=topic), key=enc(key)),
                value=Data(data=enc(value)),
            )
        )

    async def get_record(self, topic: str, key: str | bytes) -> bytes | None:
        """
        Gets a record from the key-value store.

        Parameters
        ----------
        topic : str
            The topic of the record to retrieve
        key : str | bytes
            The key of the record to retrieve

        Returns
        -------
        value : bytes | None
            The value of the record or `None` if the record is not found
        """
        record = await self.stub.GetRecord(
            DHTKey(topic=Topic(topic=topic), key=enc(key))
        )
        if record.data is not None:
            return record.data.data
        else:
            return None

    async def remove_record(self, topic: str, key: str | bytes) -> None:
        """
        Removes a record from the key-value store.

        This only applies to the local node and only affects the network once the record expires.
        """
        await self.stub.RemoveRecord(DHTKey(topic=Topic(topic=topic), key=enc(key)))

A handle to the distributed key-value store service.

Exposes methods to interact with the key-value store service, like for putting records into the key-value store or getting records from it.

Methods

async def get_record(self, topic: str, key: str | bytes) ‑> bytes | None
Expand source code
async def get_record(self, topic: str, key: str | bytes) -> bytes | None:
    """
    Gets a record from the key-value store.

    Parameters
    ----------
    topic : str
        The topic of the record to retrieve
    key : str | bytes
        The key of the record to retrieve

    Returns
    -------
    value : bytes | None
        The value of the record or `None` if the record is not found
    """
    record = await self.stub.GetRecord(
        DHTKey(topic=Topic(topic=topic), key=enc(key))
    )
    if record.data is not None:
        return record.data.data
    else:
        return None

Gets a record from the key-value store.

Parameters

topic : str
The topic of the record to retrieve
key : str | bytes
The key of the record to retrieve

Returns

value : bytes | None
The value of the record or None if the record is not found
async def put_record(self, topic: str, key: str | bytes, value: str) ‑> None
Expand source code
async def put_record(self, topic: str, key: str | bytes, value: str) -> None:
    """
    Puts a record into the key-value store.

    Parameters
    ----------
    topic : str
        The topic of the record
    key : str | bytes
        The key of the record
    value : str
        The value of the record
    """
    await self.stub.PutRecord(
        DHTRecord(
            key=DHTKey(topic=Topic(topic=topic), key=enc(key)),
            value=Data(data=enc(value)),
        )
    )

Puts a record into the key-value store.

Parameters

topic : str
The topic of the record
key : str | bytes
The key of the record
value : str
The value of the record
async def remove_record(self, topic: str, key: str | bytes) ‑> None
Expand source code
async def remove_record(self, topic: str, key: str | bytes) -> None:
    """
    Removes a record from the key-value store.

    This only applies to the local node and only affects the network once the record expires.
    """
    await self.stub.RemoveRecord(DHTKey(topic=Topic(topic=topic), key=enc(key)))

Removes a record from the key-value store.

This only applies to the local node and only affects the network once the record expires.

class LocalKVService (conn: grpc.aio._base_channel.Channel)
Expand source code
class LocalKVService:
    """
    A handle to the local key-value store service.

    Exposes methods to interact with the key-value store service,
    like putting and getting key-value records.
    The key-value store is local to the runtime and is not shared with other runtimes.
    However, it is persisted across restarts of the runtime.
    """

    def __init__(self, conn: Channel):
        self.stub = LocalKVStub(conn)

    async def put(self, key: str, value: str | bytes) -> bytes | None:
        """
        Puts a record into the key-value store.

        Returns the previous value if it exists, otherwise `None`.
        This only has local effects and does not affect other runtimes.
        However, the record is persisted across restarts of the runtime.

        Parameters
        ----------
        key : str
            The key of the record
        value : str | bytes
            The value of the record

        Returns
        -------
        value : bytes | None
            The previous value of the record or `None` if it did not exist
        """
        record = await self.stub.Put(
            LocalKVRecord(key=key, value=Data(data=enc(value)))
        )
        if record.data is not None:
            return record.data.data
        else:
            return None

    async def get(self, key: str) -> bytes | None:
        """
        Gets a record from the key-value store if it exists.

        This will not return values from other runtimes.

        Parameters
        ----------
        key : str
            The key of the record to retrieve

        Returns
        -------
        value : bytes | None
            The value of the record or `None` if the record is not found
        """
        record = await self.stub.Get(LocalKVKey(key=key))
        if record.data is not None:
            return record.data.data
        else:
            return None

A handle to the local key-value store service.

Exposes methods to interact with the key-value store service, like putting and getting key-value records. The key-value store is local to the runtime and is not shared with other runtimes. However, it is persisted across restarts of the runtime.

Methods

async def get(self, key: str) ‑> bytes | None
Expand source code
async def get(self, key: str) -> bytes | None:
    """
    Gets a record from the key-value store if it exists.

    This will not return values from other runtimes.

    Parameters
    ----------
    key : str
        The key of the record to retrieve

    Returns
    -------
    value : bytes | None
        The value of the record or `None` if the record is not found
    """
    record = await self.stub.Get(LocalKVKey(key=key))
    if record.data is not None:
        return record.data.data
    else:
        return None

Gets a record from the key-value store if it exists.

This will not return values from other runtimes.

Parameters

key : str
The key of the record to retrieve

Returns

value : bytes | None
The value of the record or None if the record is not found
async def put(self, key: str, value: str | bytes) ‑> bytes | None
Expand source code
async def put(self, key: str, value: str | bytes) -> bytes | None:
    """
    Puts a record into the key-value store.

    Returns the previous value if it exists, otherwise `None`.
    This only has local effects and does not affect other runtimes.
    However, the record is persisted across restarts of the runtime.

    Parameters
    ----------
    key : str
        The key of the record
    value : str | bytes
        The value of the record

    Returns
    -------
    value : bytes | None
        The previous value of the record or `None` if it did not exist
    """
    record = await self.stub.Put(
        LocalKVRecord(key=key, value=Data(data=enc(value)))
    )
    if record.data is not None:
        return record.data.data
    else:
        return None

Puts a record into the key-value store.

Returns the previous value if it exists, otherwise None. This only has local effects and does not affect other runtimes. However, the record is persisted across restarts of the runtime.

Parameters

key : str
The key of the record
value : str | bytes
The value of the record

Returns

value : bytes | None
The previous value of the record or None if it did not exist
class ManagedStream (stream)
Expand source code
class ManagedStream(Generic[T]):
    def __init__(self, stream):
        self.stream = stream

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.stream.cancel()

    def __aiter__(self) -> AsyncIterator[T]:
        return self.stream.__aiter__()

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Ancestors

  • typing.Generic
class NeighboursService (conn: grpc.aio._base_channel.Channel)
Expand source code
class NeighboursService:
    """
    A handle to the neighbours service.

    Exposes methods to interact with the neighbours service, such as subscribing to neighbour events
    and getting the current set of neighbours.
    """

    def __init__(self, conn: Channel):
        self.stub = NeighboursStub(conn)
        self.empty = Empty()

    def subscribe(self) -> ManagedStream[NeighbourEvent]:
        """
        Subscribes to neighbour events.

        Returns a stream of neighbour events. The stream will emit an event whenever the local
        runtime detects a change in the set of neighbours. The stream is guaranteed to emit an
        `init` event directly after subscribing and only `discovered` and `lost` events afterwards.

        Returns
        -------
        stream : ManagedStream[NeighbourEvent]
            A stream of neighbour events
        """
        neighbour_event_stream = self.stub.Subscribe(self.empty)
        return ManagedStream(neighbour_event_stream)

    async def get(self) -> list[str]:
        """
        Returns the peer IDs of the current neighbours.

        Returns
        -------
        peers : list[str]
            A list of peer IDs of the current neighbours
        """

        peers = await self.stub.Get(self.empty)
        return [peer.peer_id for peer in peers.peers]

A handle to the neighbours service.

Exposes methods to interact with the neighbours service, such as subscribing to neighbour events and getting the current set of neighbours.

Methods

async def get(self) ‑> list[str]
Expand source code
async def get(self) -> list[str]:
    """
    Returns the peer IDs of the current neighbours.

    Returns
    -------
    peers : list[str]
        A list of peer IDs of the current neighbours
    """

    peers = await self.stub.Get(self.empty)
    return [peer.peer_id for peer in peers.peers]

Returns the peer IDs of the current neighbours.

Returns

peers : list[str]
A list of peer IDs of the current neighbours
def subscribe(self) ‑> ManagedStream[bridge_pb2.NeighbourEvent]
Expand source code
def subscribe(self) -> ManagedStream[NeighbourEvent]:
    """
    Subscribes to neighbour events.

    Returns a stream of neighbour events. The stream will emit an event whenever the local
    runtime detects a change in the set of neighbours. The stream is guaranteed to emit an
    `init` event directly after subscribing and only `discovered` and `lost` events afterwards.

    Returns
    -------
    stream : ManagedStream[NeighbourEvent]
        A stream of neighbour events
    """
    neighbour_event_stream = self.stub.Subscribe(self.empty)
    return ManagedStream(neighbour_event_stream)

Subscribes to neighbour events.

Returns a stream of neighbour events. The stream will emit an event whenever the local runtime detects a change in the set of neighbours. The stream is guaranteed to emit an init event directly after subscribing and only discovered and lost events afterwards.

Returns

stream : ManagedStream[NeighbourEvent]
A stream of neighbour events
class OpenedConnection (conn: Connection)
Expand source code
class OpenedConnection:
    """
    An opened connection to the HyveOS runtime.

    This class provides access to the various services provided by HyveOS.

    An instance of this class is obtained by entering a `Connection` context manager.

    Example
    -------

    ```python
    from hyveos_sdk import Connection

    async def main():
        async with Connection() as conn:
            peer_id = await conn.get_id()

            print(f'My peer ID: {peer_id}')
    ```
    """

    _conn: grpc.aio.Channel
    _shared_dir_path: Optional[Path]
    _uri: Optional[str]
    _session: Optional[aiohttp.ClientSession]

    def __init__(self, conn: Connection):
        self._conn = conn._conn
        self._shared_dir_path = conn._shared_dir_path
        self._uri = conn._uri
        self._session = conn._session

    def get_apps_service(self) -> AppsService:
        """
        Returns a handle to the application management service.

        Returns
        -------
        AppsService
            A handle to the application management service.
        """
        return AppsService(self._conn)

    def get_debug_service(self) -> DebugService:
        """
        Returns a handle to the debug service.

        Returns
        -------
        DebugService
            A handle to the debug service.
        """
        return DebugService(self._conn)

    def get_discovery_service(self) -> DiscoveryService:
        """
        Returns a handle to the discovery service.

        Returns
        -------
        DiscoveryService
            A handle to the discovery service.
        """
        return DiscoveryService(self._conn)

    def get_file_transfer_service(self) -> FileTransferService:
        """
        Returns a handle to the file transfer service.

        Returns
        -------
        FileTransferService
            A handle to the file transfer service.
        """
        if self._uri is not None and self._session is not None:
            return NetworkFileTransferService(self._uri, self._session)
        else:
            return GrpcFileTransferService(self._conn, self._shared_dir_path)

    def get_kv_service(self) -> KVService:
        """
        Returns a handle to the distributed key-value store service.

        Returns
        -------
        KVService
            A handle to the distributed key-value store service.
        """
        return KVService(self._conn)

    def get_local_kv_service(self) -> LocalKVService:
        """
        Returns a handle to the local key-value store service.

        Returns
        -------
        LocalKVService
            A handle to the local key-value store service.
        """
        return LocalKVService(self._conn)

    def get_neighbours_service(self) -> NeighboursService:
        """
        Returns a handle to the neighbours service.

        Returns
        -------
        NeighboursService
            A handle to the neighbours service.
        """
        return NeighboursService(self._conn)

    def get_pub_sub_service(self) -> PubSubService:
        """
        Returns a handle to the pub-sub service.

        Returns
        -------
        PubSubService
            A handle to the pub-sub service.
        """
        return PubSubService(self._conn)

    def get_request_response_service(self) -> RequestResponseService:
        """
        Returns a handle to the request-response service.

        Returns
        -------
        RequestResponseService
            A handle to the request-response service.
        """
        return RequestResponseService(self._conn)

    async def get_id(self) -> str:
        """
        Returns the peer ID of the local runtime.

        Returns
        -------
        str
            The peer ID of the runtime.
        """
        peer = await DiscoveryStub(self._conn).GetOwnId(Empty())
        return peer.peer_id

An opened connection to the HyveOS runtime.

This class provides access to the various services provided by HyveOS.

An instance of this class is obtained by entering a Connection context manager.

Example

from hyveos_sdk import Connection

async def main():
    async with Connection() as conn:
        peer_id = await conn.get_id()

        print(f'My peer ID: {peer_id}')

Methods

def get_apps_service(self) ‑> AppsService
Expand source code
def get_apps_service(self) -> AppsService:
    """
    Returns a handle to the application management service.

    Returns
    -------
    AppsService
        A handle to the application management service.
    """
    return AppsService(self._conn)

Returns a handle to the application management service.

Returns

AppsService
A handle to the application management service.
def get_debug_service(self) ‑> DebugService
Expand source code
def get_debug_service(self) -> DebugService:
    """
    Returns a handle to the debug service.

    Returns
    -------
    DebugService
        A handle to the debug service.
    """
    return DebugService(self._conn)

Returns a handle to the debug service.

Returns

DebugService
A handle to the debug service.
def get_discovery_service(self) ‑> DiscoveryService
Expand source code
def get_discovery_service(self) -> DiscoveryService:
    """
    Returns a handle to the discovery service.

    Returns
    -------
    DiscoveryService
        A handle to the discovery service.
    """
    return DiscoveryService(self._conn)

Returns a handle to the discovery service.

Returns

DiscoveryService
A handle to the discovery service.
def get_file_transfer_service(self) ‑> FileTransferService
Expand source code
def get_file_transfer_service(self) -> FileTransferService:
    """
    Returns a handle to the file transfer service.

    Returns
    -------
    FileTransferService
        A handle to the file transfer service.
    """
    if self._uri is not None and self._session is not None:
        return NetworkFileTransferService(self._uri, self._session)
    else:
        return GrpcFileTransferService(self._conn, self._shared_dir_path)

Returns a handle to the file transfer service.

Returns

FileTransferService
A handle to the file transfer service.
async def get_id(self) ‑> str
Expand source code
async def get_id(self) -> str:
    """
    Returns the peer ID of the local runtime.

    Returns
    -------
    str
        The peer ID of the runtime.
    """
    peer = await DiscoveryStub(self._conn).GetOwnId(Empty())
    return peer.peer_id

Returns the peer ID of the local runtime.

Returns

str
The peer ID of the runtime.
def get_kv_service(self) ‑> KVService
Expand source code
def get_kv_service(self) -> KVService:
    """
    Returns a handle to the distributed key-value store service.

    Returns
    -------
    KVService
        A handle to the distributed key-value store service.
    """
    return KVService(self._conn)

Returns a handle to the distributed key-value store service.

Returns

KVService
A handle to the distributed key-value store service.
def get_local_kv_service(self) ‑> LocalKVService
Expand source code
def get_local_kv_service(self) -> LocalKVService:
    """
    Returns a handle to the local key-value store service.

    Returns
    -------
    LocalKVService
        A handle to the local key-value store service.
    """
    return LocalKVService(self._conn)

Returns a handle to the local key-value store service.

Returns

LocalKVService
A handle to the local key-value store service.
def get_neighbours_service(self) ‑> NeighboursService
Expand source code
def get_neighbours_service(self) -> NeighboursService:
    """
    Returns a handle to the neighbours service.

    Returns
    -------
    NeighboursService
        A handle to the neighbours service.
    """
    return NeighboursService(self._conn)

Returns a handle to the neighbours service.

Returns

NeighboursService
A handle to the neighbours service.
def get_pub_sub_service(self) ‑> PubSubService
Expand source code
def get_pub_sub_service(self) -> PubSubService:
    """
    Returns a handle to the pub-sub service.

    Returns
    -------
    PubSubService
        A handle to the pub-sub service.
    """
    return PubSubService(self._conn)

Returns a handle to the pub-sub service.

Returns

PubSubService
A handle to the pub-sub service.
def get_request_response_service(self) ‑> RequestResponseService
Expand source code
def get_request_response_service(self) -> RequestResponseService:
    """
    Returns a handle to the request-response service.

    Returns
    -------
    RequestResponseService
        A handle to the request-response service.
    """
    return RequestResponseService(self._conn)

Returns a handle to the request-response service.

Returns

RequestResponseService
A handle to the request-response service.
class PubSubService (conn: grpc.aio._base_channel.Channel)
Expand source code
class PubSubService:
    """
    A handle to the pub-sub service.

    Exposes methods to interact with the pub-sub service, like for subscribing to topics and
    publishing messages.
    """

    def __init__(self, conn: Channel):
        self.stub = PubSubStub(conn)

    def subscribe(self, topic: str) -> ManagedStream[PubSubRecvMessage]:
        """
        Subscribes to a topic and returns a stream of messages published to that topic.

        Parameters
        ----------
        topic : str
            The topic to subscribe to

        Returns
        -------
        stream : ManagedStream[GossipSubRecvMessage]
            Stream of received messages from the pub-sub topic
        """
        gossip_sub_recv_messages_stream = self.stub.Subscribe(Topic(topic=topic))
        return ManagedStream(gossip_sub_recv_messages_stream)

    async def publish(self, data: str | bytes, topic: str) -> bytes:
        """
        Publishes a message to a topic.

        Parameters
        ----------
        data : str | bytes
            Data to publish
        topic : str
            Topic to publish the data into

        Returns
        -------
        gossip_sub_message_id : bytes
            ID of the sent message
        """

        send_data = Data(data=enc(data))

        gossip_sub_message = PubSubMessage(data=send_data, topic=Topic(topic=topic))
        gossip_sub_message_id = await self.stub.Publish(gossip_sub_message)

        return gossip_sub_message_id.id

A handle to the pub-sub service.

Exposes methods to interact with the pub-sub service, like for subscribing to topics and publishing messages.

Methods

async def publish(self, data: str | bytes, topic: str) ‑> bytes
Expand source code
async def publish(self, data: str | bytes, topic: str) -> bytes:
    """
    Publishes a message to a topic.

    Parameters
    ----------
    data : str | bytes
        Data to publish
    topic : str
        Topic to publish the data into

    Returns
    -------
    gossip_sub_message_id : bytes
        ID of the sent message
    """

    send_data = Data(data=enc(data))

    gossip_sub_message = PubSubMessage(data=send_data, topic=Topic(topic=topic))
    gossip_sub_message_id = await self.stub.Publish(gossip_sub_message)

    return gossip_sub_message_id.id

Publishes a message to a topic.

Parameters

data : str | bytes
Data to publish
topic : str
Topic to publish the data into

Returns

gossip_sub_message_id : bytes
ID of the sent message
def subscribe(self, topic: str) ‑> ManagedStream[bridge_pb2.PubSubRecvMessage]
Expand source code
def subscribe(self, topic: str) -> ManagedStream[PubSubRecvMessage]:
    """
    Subscribes to a topic and returns a stream of messages published to that topic.

    Parameters
    ----------
    topic : str
        The topic to subscribe to

    Returns
    -------
    stream : ManagedStream[GossipSubRecvMessage]
        Stream of received messages from the pub-sub topic
    """
    gossip_sub_recv_messages_stream = self.stub.Subscribe(Topic(topic=topic))
    return ManagedStream(gossip_sub_recv_messages_stream)

Subscribes to a topic and returns a stream of messages published to that topic.

Parameters

topic : str
The topic to subscribe to

Returns

stream : ManagedStream[GossipSubRecvMessage]
Stream of received messages from the pub-sub topic
class RequestResponseService (conn: grpc.aio._base_channel.Channel)
Expand source code
class RequestResponseService:
    """
    A handle to the request-response service.

    Exposes methods to interact with the request-response service, like for sending and receiving
    requests, and for sending responses.
    """

    def __init__(self, conn: Channel):
        self.stub = ReqRespStub(conn)

    async def send_request(
        self, peer_id: str, data: str | bytes, topic: Optional[str] = None
    ) -> Response:
        """
        Sends a request with an optional topic to a peer and returns the response.

        The peer must be subscribed to the topic in order to receive the request.
        If `topic` is `None`, the peer must be subscribed to `None` as well.

        Parameters
        ----------
        peer_id : str
            The peer_id of the target
        data : str | bytes
            Data to send
        topic : str, optional
            Topic the peer should be subscribed to if this argument is specified (default: None)

        Returns
        -------
        response : Response
            Reponse from Peer `peer_id` to the sent request, awaited
        """

        optional_topic = OptionalTopic()
        if topic is not None:
            optional_topic = OptionalTopic(topic=Topic(topic=topic))

        send_data = Data(data=enc(data))

        message = Message(data=send_data, topic=optional_topic)
        send_request = SendRequest(peer=Peer(peer_id=peer_id), msg=message)

        response = await self.stub.Send(send_request)

        return response

    def receive(
        self,
        query: Optional[str] = None,
        regex: bool = False,
    ) -> ManagedStream[RecvRequest]:
        """
        Subscribes to a topic and returns a stream of received requests.

        Parameters
        ----------
        query : str, optional
            Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None)
        regex : bool
            Query is specified as a regex, not a single `topic` string (default: False)

        Returns
        -------
        stream : ManagedStream[RecvRequest]
            Stream of received requests from the specified topic
        """

        optional_topic_query = OptionalTopicQuery()

        if query is not None:
            if regex:
                optional_topic_query = OptionalTopicQuery(query=TopicQuery(regex=query))
            else:
                optional_topic_query = OptionalTopicQuery(
                    query=TopicQuery(topic=Topic(topic=query))
                )

        stream = self.stub.Recv(optional_topic_query)
        return ManagedStream(stream)

    async def respond(
        self, seq: int, data: str | bytes, error: Optional[str] = None
    ) -> None:
        """
        Respond to a request received from receive()

        Parameters
        ----------
        seq : int
            Sequence number for request-response matching
        data : str | bytes
            Reponse message data. If error is specified, this won't reach the peer
        error : str
            Respond with an error message if an error occurred (default:  None)
        """

        if error is not None:
            response = Response(error=error)
        else:
            response = Response(data=Data(data=enc(data)))

        send_response = SendResponse(seq=seq, response=response)
        await self.stub.Respond(send_response)

A handle to the request-response service.

Exposes methods to interact with the request-response service, like for sending and receiving requests, and for sending responses.

Methods

def receive(self, query: str | None = None, regex: bool = False) ‑> ManagedStream[bridge_pb2.RecvRequest]
Expand source code
def receive(
    self,
    query: Optional[str] = None,
    regex: bool = False,
) -> ManagedStream[RecvRequest]:
    """
    Subscribes to a topic and returns a stream of received requests.

    Parameters
    ----------
    query : str, optional
        Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None)
    regex : bool
        Query is specified as a regex, not a single `topic` string (default: False)

    Returns
    -------
    stream : ManagedStream[RecvRequest]
        Stream of received requests from the specified topic
    """

    optional_topic_query = OptionalTopicQuery()

    if query is not None:
        if regex:
            optional_topic_query = OptionalTopicQuery(query=TopicQuery(regex=query))
        else:
            optional_topic_query = OptionalTopicQuery(
                query=TopicQuery(topic=Topic(topic=query))
            )

    stream = self.stub.Recv(optional_topic_query)
    return ManagedStream(stream)

Subscribes to a topic and returns a stream of received requests.

Parameters

query : str, optional
Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None)
regex : bool
Query is specified as a regex, not a single topic string (default: False)

Returns

stream : ManagedStream[RecvRequest]
Stream of received requests from the specified topic
async def respond(self, seq: int, data: str | bytes, error: str | None = None) ‑> None
Expand source code
async def respond(
    self, seq: int, data: str | bytes, error: Optional[str] = None
) -> None:
    """
    Respond to a request received from receive()

    Parameters
    ----------
    seq : int
        Sequence number for request-response matching
    data : str | bytes
        Reponse message data. If error is specified, this won't reach the peer
    error : str
        Respond with an error message if an error occurred (default:  None)
    """

    if error is not None:
        response = Response(error=error)
    else:
        response = Response(data=Data(data=enc(data)))

    send_response = SendResponse(seq=seq, response=response)
    await self.stub.Respond(send_response)

Respond to a request received from receive()

Parameters

seq : int
Sequence number for request-response matching
data : str | bytes
Reponse message data. If error is specified, this won't reach the peer
error : str
Respond with an error message if an error occurred (default: None)
async def send_request(self, peer_id: str, data: str | bytes, topic: str | None = None) ‑> bridge_pb2.Response
Expand source code
async def send_request(
    self, peer_id: str, data: str | bytes, topic: Optional[str] = None
) -> Response:
    """
    Sends a request with an optional topic to a peer and returns the response.

    The peer must be subscribed to the topic in order to receive the request.
    If `topic` is `None`, the peer must be subscribed to `None` as well.

    Parameters
    ----------
    peer_id : str
        The peer_id of the target
    data : str | bytes
        Data to send
    topic : str, optional
        Topic the peer should be subscribed to if this argument is specified (default: None)

    Returns
    -------
    response : Response
        Reponse from Peer `peer_id` to the sent request, awaited
    """

    optional_topic = OptionalTopic()
    if topic is not None:
        optional_topic = OptionalTopic(topic=Topic(topic=topic))

    send_data = Data(data=enc(data))

    message = Message(data=send_data, topic=optional_topic)
    send_request = SendRequest(peer=Peer(peer_id=peer_id), msg=message)

    response = await self.stub.Send(send_request)

    return response

Sends a request with an optional topic to a peer and returns the response.

The peer must be subscribed to the topic in order to receive the request. If topic is None, the peer must be subscribed to None as well.

Parameters

peer_id : str
The peer_id of the target
data : str | bytes
Data to send
topic : str, optional
Topic the peer should be subscribed to if this argument is specified (default: None)

Returns

response : Response
Reponse from Peer peer_id to the sent request, awaited