Package hyveos_sdk
Sub-modules
hyveos_sdk.connection
hyveos_sdk.protocol
hyveos_sdk.services
Classes
class AppsService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class AppsService: """ A handle to the application management service. Exposes methods to interact with the application management service, like for deploying and stopping apps on peers in the network. """ def __init__(self, conn: Channel): self.stub = AppsStub(conn) self.empty = Empty() async def deploy( self, image: str, local: bool, ports: Iterable[int] = [], peer_id: Optional[str] = None, ) -> str: """ Deploys an application in a docker image to a peer in the network. Returns the ULID of the deployed application. To deploy to self, leave the peer_id argument empty. Parameters ---------- image : str A docker image name, can contain a tag, e.g. `my-docker-image:latest` local : bool Whether the image is available locally, without needing to pull it from a registry ports : Iterable[int], optional Ports to expose on the container (default: []) peer_id : str, optional The peer_id of the target node or None to deploy to self (default: None) Returns ------- app_id : str The id of the deployed application """ if peer_id is not None: peer = Peer(peer_id=peer_id) else: peer = None id = await self.stub.Deploy( DeployAppRequest( app=DockerApp(image=DockerImage(name=image), ports=ports), local=local, peer=peer, ) ) return id.ulid async def list_running(self, peer_id: Optional[str] = None) -> Iterable[RunningApp]: """ Lists the running apps on a peer in the network. To list the running apps on self, leave the peer_id argument empty. Parameters ---------- peer_id : str, optional The peer_id of the target node or None to list apps on self (default: None) Returns ------- app_ids : Iterable[str] The ids of the running applications """ if peer_id is not None: peer = Peer(peer_id=peer_id) else: peer = None response = await self.stub.ListRunning(ListRunningAppsRequest(peer=peer)) return response.apps async def stop(self, app_id: str, peer_id: Optional[str] = None): """ Stops a running app with an ID on a peer in the network. To stop the running app on self, leave the peer_id argument empty. Parameters ---------- app_id : str The id of the app to stop peer_id : str, optional The peer_id of the target node or None to stop the app on self (default: None) """ if peer_id is not None: peer = Peer(peer_id=peer_id) else: peer = None await self.stub.Stop(StopAppRequest(id=ID(ulid=app_id), peer=peer)) async def get_own_app_id(self) -> str: """ Get the ID of the current app. This can only be called from a running app. Returns ------- app_id : str The id of the current app """ id = await self.stub.GetOwnAppId(self.empty) return id.ulid
A handle to the application management service.
Exposes methods to interact with the application management service, like for deploying and stopping apps on peers in the network.
Methods
async def deploy(self,
image: str,
local: bool,
ports: Iterable[int] = [],
peer_id: str | None = None) ‑> str-
Expand source code
async def deploy( self, image: str, local: bool, ports: Iterable[int] = [], peer_id: Optional[str] = None, ) -> str: """ Deploys an application in a docker image to a peer in the network. Returns the ULID of the deployed application. To deploy to self, leave the peer_id argument empty. Parameters ---------- image : str A docker image name, can contain a tag, e.g. `my-docker-image:latest` local : bool Whether the image is available locally, without needing to pull it from a registry ports : Iterable[int], optional Ports to expose on the container (default: []) peer_id : str, optional The peer_id of the target node or None to deploy to self (default: None) Returns ------- app_id : str The id of the deployed application """ if peer_id is not None: peer = Peer(peer_id=peer_id) else: peer = None id = await self.stub.Deploy( DeployAppRequest( app=DockerApp(image=DockerImage(name=image), ports=ports), local=local, peer=peer, ) ) return id.ulid
Deploys an application in a docker image to a peer in the network.
Returns the ULID of the deployed application.
To deploy to self, leave the peer_id argument empty.
Parameters
image
:str
- A docker image name, can contain a tag, e.g.
my-docker-image:latest
local
:bool
- Whether the image is available locally, without needing to pull it from a registry
ports
:Iterable[int]
, optional- Ports to expose on the container (default: [])
peer_id
:str
, optional- The peer_id of the target node or None to deploy to self (default: None)
Returns
app_id
:str
- The id of the deployed application
async def get_own_app_id(self) ‑> str
-
Expand source code
async def get_own_app_id(self) -> str: """ Get the ID of the current app. This can only be called from a running app. Returns ------- app_id : str The id of the current app """ id = await self.stub.GetOwnAppId(self.empty) return id.ulid
Get the ID of the current app.
This can only be called from a running app.
Returns
app_id
:str
- The id of the current app
async def list_running(self, peer_id: str | None = None) ‑> Iterable[bridge_pb2.RunningApp]
-
Expand source code
async def list_running(self, peer_id: Optional[str] = None) -> Iterable[RunningApp]: """ Lists the running apps on a peer in the network. To list the running apps on self, leave the peer_id argument empty. Parameters ---------- peer_id : str, optional The peer_id of the target node or None to list apps on self (default: None) Returns ------- app_ids : Iterable[str] The ids of the running applications """ if peer_id is not None: peer = Peer(peer_id=peer_id) else: peer = None response = await self.stub.ListRunning(ListRunningAppsRequest(peer=peer)) return response.apps
Lists the running apps on a peer in the network.
To list the running apps on self, leave the peer_id argument empty.
Parameters
peer_id
:str
, optional- The peer_id of the target node or None to list apps on self (default: None)
Returns
app_ids
:Iterable[str]
- The ids of the running applications
async def stop(self, app_id: str, peer_id: str | None = None)
-
Expand source code
async def stop(self, app_id: str, peer_id: Optional[str] = None): """ Stops a running app with an ID on a peer in the network. To stop the running app on self, leave the peer_id argument empty. Parameters ---------- app_id : str The id of the app to stop peer_id : str, optional The peer_id of the target node or None to stop the app on self (default: None) """ if peer_id is not None: peer = Peer(peer_id=peer_id) else: peer = None await self.stub.Stop(StopAppRequest(id=ID(ulid=app_id), peer=peer))
Stops a running app with an ID on a peer in the network.
To stop the running app on self, leave the peer_id argument empty.
Parameters
app_id
:str
- The id of the app to stop
peer_id
:str
, optional- The peer_id of the target node or None to stop the app on self (default: None)
class Connection (socket_path: pathlib.Path | str | None = None,
shared_dir_path: pathlib.Path | str | None = None,
uri: str | None = None)-
Expand source code
class Connection: """ A connection to the hyveOS runtime. This class is used to establish a connection to the HyveOS runtime. It is used as a context manager to ensure that the connection is properly closed when it is no longer needed. By default, the connection to the HyveOS runtime will be made through the application bridge, i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable will be used to communicate with the runtime. If another connection type is desired, you can specify either the `socket_path` and `shared_dir_path` parameters, or the `uri` parameter when creating the connection. Example ------- ```python from hyveos_sdk import Connection async def main(): async with Connection() as conn: peer_id = await conn.get_id() print(f'My peer ID: {peer_id}') ``` """ _conn: grpc.aio.Channel _shared_dir_path: Optional[Path] _uri: Optional[str] _session: Optional[aiohttp.ClientSession] def __init__( self, socket_path: Optional[Path | str] = None, shared_dir_path: Optional[Path | str] = None, uri: Optional[str] = None, ): """ Establishes a connection to the HyveOS runtime. By default, the connection to the HyveOS runtime will be made through the application bridge, i.e., the Unix domain socket specified by the `HYVEOS_BRIDGE_SOCKET` environment variable will be used to communicate with the runtime. If another connection type is desired, you can specify either the `socket_path` and `shared_dir_path` parameters, or the `uri` parameter. Parameters ---------- socket_path : Path | str, optional A custom path to a Unix domain socket to connect to. The socket path should point to a Unix domain socket that the HyveOS runtime is listening on. Mutually exclusive with `uri`. If `socket_path` is provided, `shared_dir_path` must also be provided. shared_dir_path : Path | str, optional A path to a directory where the runtime expects files provided with the file-transfer service to be stored. Mutually exclusive with `uri`. Must be provided if `socket_path` is provided. uri : str, optional A URI to connect to over the network. The URI should be in the format `http://<host>:<port>`. A HyveOS runtime should be listening at the given address. Mutually exclusive with `socket_path` and `shared_dir_path`. Raises ------ ValueError If both `socket_path` and `uri` are provided. """ shared_dir_path = ( Path(shared_dir_path) if isinstance(shared_dir_path, str) else shared_dir_path ) self._shared_dir_path = ( shared_dir_path.resolve(strict=True) if shared_dir_path is not None else None ) self._uri = uri self._session = None if socket_path is not None: if shared_dir_path is None: raise ValueError( '`shared_dir_path` must be provided when `socket_path` is provided' ) if uri is not None: raise ValueError( 'Only one of `socket_path` and `shared_dir_path`, or `uri` can be provided' ) self._conn = grpc.aio.insecure_channel( f'unix://{socket_path}', options=(('grpc.default_authority', 'localhost'),), ) elif uri is not None: if shared_dir_path is not None: raise ValueError( '`shared_dir_path` cannot be provided when `uri` is provided' ) self._conn = grpc.aio.insecure_channel(uri) self._session = aiohttp.ClientSession() elif shared_dir_path is not None: raise ValueError( '`shared_dir_path` cannot be provided without `socket_path`' ) else: bridge_socket_path = os.environ['HYVEOS_BRIDGE_SOCKET'] self._conn = grpc.aio.insecure_channel( f'unix://{bridge_socket_path}', options=(('grpc.default_authority', 'localhost'),), ) async def __aenter__(self) -> 'OpenedConnection': return OpenedConnection(self) async def __aexit__(self, exc_type, exc_val, exc_tb): await self._conn.close() if self._session is not None: await self._session.close()
A connection to the hyveOS runtime.
This class is used to establish a connection to the HyveOS runtime. It is used as a context manager to ensure that the connection is properly closed when it is no longer needed.
By default, the connection to the HyveOS runtime will be made through the application bridge, i.e., the Unix domain socket specified by the
HYVEOS_BRIDGE_SOCKET
environment variable will be used to communicate with the runtime.If another connection type is desired, you can specify either the
socket_path
andshared_dir_path
parameters, or theuri
parameter when creating the connection.Example
from hyveos_sdk import Connection async def main(): async with Connection() as conn: peer_id = await conn.get_id() print(f'My peer ID: {peer_id}')
Establishes a connection to the HyveOS runtime.
By default, the connection to the HyveOS runtime will be made through the application bridge, i.e., the Unix domain socket specified by the
HYVEOS_BRIDGE_SOCKET
environment variable will be used to communicate with the runtime.If another connection type is desired, you can specify either the
socket_path
andshared_dir_path
parameters, or theuri
parameter.Parameters
socket_path
:Path | str
, optional-
A custom path to a Unix domain socket to connect to. The socket path should point to a Unix domain socket that the HyveOS runtime is listening on.
Mutually exclusive with
uri
. Ifsocket_path
is provided,shared_dir_path
must also be provided. shared_dir_path
:Path | str
, optional-
A path to a directory where the runtime expects files provided with the file-transfer service to be stored.
Mutually exclusive with
uri
. Must be provided ifsocket_path
is provided. uri
:str
, optional-
A URI to connect to over the network. The URI should be in the format
http://<host>:<port>
. A HyveOS runtime should be listening at the given address.Mutually exclusive with
socket_path
andshared_dir_path
.
Raises
ValueError
- If both
socket_path
anduri
are provided.
class DebugService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class DebugService: """ A handle to the debug service. Exposes methods to interact with the debug service, such as subscribing to mesh topology events and message debug events. """ def __init__(self, conn: Channel): self.stub = DebugStub(conn) def subscribe_mesh_topology(self) -> ManagedStream[MeshTopologyEvent]: """ Subscribes to mesh topology events. Returns a stream of mesh topology events. The stream will emit an event whenever the mesh topology changes. For each peer in the mesh, it is guaranteed that the stream will first emit an `init` event when it enters the mesh, followed only by `discovered` and `lost` events, until the peer leaves the mesh. Returns ------- stream : ManagedStream[MeshTopologyEvent] A stream of mesh topology events """ stream = self.stub.SubscribeMeshTopology(Empty()) return ManagedStream(stream) def subscribe_messages(self) -> ManagedStream[MessageDebugEvent]: """ Subscribes to message debug events. Returns a stream of mesh debug events. The stream will emit an event whenever a request, response, or gossipsub message is sent by a peer in the mesh. Returns ------- stream : ManagedStream[MessageDebugEvent] A stream of message debug events """ stream = self.stub.SubscribeMessages(Empty()) return ManagedStream(stream)
A handle to the debug service.
Exposes methods to interact with the debug service, such as subscribing to mesh topology events and message debug events.
Methods
def subscribe_mesh_topology(self) ‑> ManagedStream[bridge_pb2.MeshTopologyEvent]
-
Expand source code
def subscribe_mesh_topology(self) -> ManagedStream[MeshTopologyEvent]: """ Subscribes to mesh topology events. Returns a stream of mesh topology events. The stream will emit an event whenever the mesh topology changes. For each peer in the mesh, it is guaranteed that the stream will first emit an `init` event when it enters the mesh, followed only by `discovered` and `lost` events, until the peer leaves the mesh. Returns ------- stream : ManagedStream[MeshTopologyEvent] A stream of mesh topology events """ stream = self.stub.SubscribeMeshTopology(Empty()) return ManagedStream(stream)
Subscribes to mesh topology events.
Returns a stream of mesh topology events. The stream will emit an event whenever the mesh topology changes.
For each peer in the mesh, it is guaranteed that the stream will first emit an
init
event when it enters the mesh, followed only bydiscovered
andlost
events, until the peer leaves the mesh.Returns
stream
:ManagedStream[MeshTopologyEvent]
- A stream of mesh topology events
def subscribe_messages(self) ‑> ManagedStream[bridge_pb2.MessageDebugEvent]
-
Expand source code
def subscribe_messages(self) -> ManagedStream[MessageDebugEvent]: """ Subscribes to message debug events. Returns a stream of mesh debug events. The stream will emit an event whenever a request, response, or gossipsub message is sent by a peer in the mesh. Returns ------- stream : ManagedStream[MessageDebugEvent] A stream of message debug events """ stream = self.stub.SubscribeMessages(Empty()) return ManagedStream(stream)
Subscribes to message debug events.
Returns a stream of mesh debug events. The stream will emit an event whenever a request, response, or gossipsub message is sent by a peer in the mesh.
Returns
stream
:ManagedStream[MessageDebugEvent]
- A stream of message debug events
class DiscoveryService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class DiscoveryService: """ A handle to the discovery service. Exposes methods to interact with the discovery service, like for marking the local runtime as a provider for a discovery key or getting the providers for a discovery key. """ def __init__(self, conn: Channel): self.stub = DiscoveryStub(conn) async def provide(self, topic: str, key: str | bytes) -> None: """ Marks the local runtime as a provider for a discovery key. """ await self.stub.Provide(DHTKey(topic=Topic(topic=topic), key=enc(key))) def get_providers(self, topic: str, key: str | bytes) -> ManagedStream[Peer]: """ Gets the providers for a discovery key. Returns ------- stream : ManagedStream[Peer] A stream of providers for the discovery key. """ stream = self.stub.GetProviders(DHTKey(topic=Topic(topic=topic), key=enc(key))) return ManagedStream(stream) async def stop_providing(self, topic: str, key: str | bytes) -> None: """ Stops providing a discovery key. """ await self.stub.StopProviding(DHTKey(topic=Topic(topic=topic), key=enc(key)))
A handle to the discovery service.
Exposes methods to interact with the discovery service, like for marking the local runtime as a provider for a discovery key or getting the providers for a discovery key.
Methods
def get_providers(self, topic: str, key: str | bytes) ‑> ManagedStream[bridge_pb2.Peer]
-
Expand source code
def get_providers(self, topic: str, key: str | bytes) -> ManagedStream[Peer]: """ Gets the providers for a discovery key. Returns ------- stream : ManagedStream[Peer] A stream of providers for the discovery key. """ stream = self.stub.GetProviders(DHTKey(topic=Topic(topic=topic), key=enc(key))) return ManagedStream(stream)
Gets the providers for a discovery key.
Returns
stream
:ManagedStream[Peer]
- A stream of providers for the discovery key.
async def provide(self, topic: str, key: str | bytes) ‑> None
-
Expand source code
async def provide(self, topic: str, key: str | bytes) -> None: """ Marks the local runtime as a provider for a discovery key. """ await self.stub.Provide(DHTKey(topic=Topic(topic=topic), key=enc(key)))
Marks the local runtime as a provider for a discovery key.
async def stop_providing(self, topic: str, key: str | bytes) ‑> None
-
Expand source code
async def stop_providing(self, topic: str, key: str | bytes) -> None: """ Stops providing a discovery key. """ await self.stub.StopProviding(DHTKey(topic=Topic(topic=topic), key=enc(key)))
Stops providing a discovery key.
class FileTransferService
-
Expand source code
class FileTransferService(ABC): """ A handle to the file transfer service. Exposes methods to interact with the file transfer service, like for publishing and getting files. """ @abstractmethod async def publish(self, file_path: Path | str) -> FileToken: """ Publishes a file in the mesh network and returns its content ID. Before it's published, the file is copied to the shared directory if it is not already there. By default, the shared directory is defined by the `HYVEOS_BRIDGE_SHARED_DIR` environment variable. However, it can be set to a custom path when initializing the connection to the HyveOS runtime. Parameters ---------- file_path : Path | str The local path to the file to publish Returns ------- file_token : FileToken The content ID of the published file """ pass @abstractmethod async def get(self, file_token: FileToken) -> FilePath: """ Retrieves a file from the mesh network and returns its path. When the local runtime doesn't own a copy of this file yet, it downloads it from one of its peers. Afterwards, or if it was already locally available, the file is copied into the shared directory, which is defined by the `HYVEOS_BRIDGE_SHARED_DIR` environment variable. Parameters ---------- file_token : FileToken The content ID of the file to retrieve Returns ------- file_path : FilePath The local path to the retrieved file """ pass
A handle to the file transfer service.
Exposes methods to interact with the file transfer service, like for publishing and getting files.
Ancestors
- abc.ABC
Subclasses
Methods
async def get(self,
file_token: FileToken) ‑> bridge_pb2.FilePath-
Expand source code
@abstractmethod async def get(self, file_token: FileToken) -> FilePath: """ Retrieves a file from the mesh network and returns its path. When the local runtime doesn't own a copy of this file yet, it downloads it from one of its peers. Afterwards, or if it was already locally available, the file is copied into the shared directory, which is defined by the `HYVEOS_BRIDGE_SHARED_DIR` environment variable. Parameters ---------- file_token : FileToken The content ID of the file to retrieve Returns ------- file_path : FilePath The local path to the retrieved file """ pass
Retrieves a file from the mesh network and returns its path.
When the local runtime doesn't own a copy of this file yet, it downloads it from one of its peers. Afterwards, or if it was already locally available, the file is copied into the shared directory, which is defined by the
HYVEOS_BRIDGE_SHARED_DIR
environment variable.Parameters
file_token
:FileToken
- The content ID of the file to retrieve
Returns
file_path
:FilePath
- The local path to the retrieved file
async def publish(self, file_path: pathlib.Path | str) ‑> FileToken
-
Expand source code
@abstractmethod async def publish(self, file_path: Path | str) -> FileToken: """ Publishes a file in the mesh network and returns its content ID. Before it's published, the file is copied to the shared directory if it is not already there. By default, the shared directory is defined by the `HYVEOS_BRIDGE_SHARED_DIR` environment variable. However, it can be set to a custom path when initializing the connection to the HyveOS runtime. Parameters ---------- file_path : Path | str The local path to the file to publish Returns ------- file_token : FileToken The content ID of the published file """ pass
Publishes a file in the mesh network and returns its content ID.
Before it's published, the file is copied to the shared directory if it is not already there. By default, the shared directory is defined by the
HYVEOS_BRIDGE_SHARED_DIR
environment variable. However, it can be set to a custom path when initializing the connection to the HyveOS runtime.Parameters
file_path
:Path | str
- The local path to the file to publish
Returns
file_token
:FileToken
- The content ID of the published file
class KVService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class KVService: """ A handle to the distributed key-value store service. Exposes methods to interact with the key-value store service, like for putting records into the key-value store or getting records from it. """ def __init__(self, conn: Channel): self.stub = KVStub(conn) async def put_record(self, topic: str, key: str | bytes, value: str) -> None: """ Puts a record into the key-value store. Parameters ---------- topic : str The topic of the record key : str | bytes The key of the record value : str The value of the record """ await self.stub.PutRecord( DHTRecord( key=DHTKey(topic=Topic(topic=topic), key=enc(key)), value=Data(data=enc(value)), ) ) async def get_record(self, topic: str, key: str | bytes) -> bytes | None: """ Gets a record from the key-value store. Parameters ---------- topic : str The topic of the record to retrieve key : str | bytes The key of the record to retrieve Returns ------- value : bytes | None The value of the record or `None` if the record is not found """ record = await self.stub.GetRecord( DHTKey(topic=Topic(topic=topic), key=enc(key)) ) if record.data is not None: return record.data.data else: return None async def remove_record(self, topic: str, key: str | bytes) -> None: """ Removes a record from the key-value store. This only applies to the local node and only affects the network once the record expires. """ await self.stub.RemoveRecord(DHTKey(topic=Topic(topic=topic), key=enc(key)))
A handle to the distributed key-value store service.
Exposes methods to interact with the key-value store service, like for putting records into the key-value store or getting records from it.
Methods
async def get_record(self, topic: str, key: str | bytes) ‑> bytes | None
-
Expand source code
async def get_record(self, topic: str, key: str | bytes) -> bytes | None: """ Gets a record from the key-value store. Parameters ---------- topic : str The topic of the record to retrieve key : str | bytes The key of the record to retrieve Returns ------- value : bytes | None The value of the record or `None` if the record is not found """ record = await self.stub.GetRecord( DHTKey(topic=Topic(topic=topic), key=enc(key)) ) if record.data is not None: return record.data.data else: return None
Gets a record from the key-value store.
Parameters
topic
:str
- The topic of the record to retrieve
key
:str | bytes
- The key of the record to retrieve
Returns
value
:bytes | None
- The value of the record or
None
if the record is not found
async def put_record(self, topic: str, key: str | bytes, value: str) ‑> None
-
Expand source code
async def put_record(self, topic: str, key: str | bytes, value: str) -> None: """ Puts a record into the key-value store. Parameters ---------- topic : str The topic of the record key : str | bytes The key of the record value : str The value of the record """ await self.stub.PutRecord( DHTRecord( key=DHTKey(topic=Topic(topic=topic), key=enc(key)), value=Data(data=enc(value)), ) )
Puts a record into the key-value store.
Parameters
topic
:str
- The topic of the record
key
:str | bytes
- The key of the record
value
:str
- The value of the record
async def remove_record(self, topic: str, key: str | bytes) ‑> None
-
Expand source code
async def remove_record(self, topic: str, key: str | bytes) -> None: """ Removes a record from the key-value store. This only applies to the local node and only affects the network once the record expires. """ await self.stub.RemoveRecord(DHTKey(topic=Topic(topic=topic), key=enc(key)))
Removes a record from the key-value store.
This only applies to the local node and only affects the network once the record expires.
class LocalKVService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class LocalKVService: """ A handle to the local key-value store service. Exposes methods to interact with the key-value store service, like putting and getting key-value records. The key-value store is local to the runtime and is not shared with other runtimes. However, it is persisted across restarts of the runtime. """ def __init__(self, conn: Channel): self.stub = LocalKVStub(conn) async def put(self, key: str, value: str | bytes) -> bytes | None: """ Puts a record into the key-value store. Returns the previous value if it exists, otherwise `None`. This only has local effects and does not affect other runtimes. However, the record is persisted across restarts of the runtime. Parameters ---------- key : str The key of the record value : str | bytes The value of the record Returns ------- value : bytes | None The previous value of the record or `None` if it did not exist """ record = await self.stub.Put( LocalKVRecord(key=key, value=Data(data=enc(value))) ) if record.data is not None: return record.data.data else: return None async def get(self, key: str) -> bytes | None: """ Gets a record from the key-value store if it exists. This will not return values from other runtimes. Parameters ---------- key : str The key of the record to retrieve Returns ------- value : bytes | None The value of the record or `None` if the record is not found """ record = await self.stub.Get(LocalKVKey(key=key)) if record.data is not None: return record.data.data else: return None
A handle to the local key-value store service.
Exposes methods to interact with the key-value store service, like putting and getting key-value records. The key-value store is local to the runtime and is not shared with other runtimes. However, it is persisted across restarts of the runtime.
Methods
async def get(self, key: str) ‑> bytes | None
-
Expand source code
async def get(self, key: str) -> bytes | None: """ Gets a record from the key-value store if it exists. This will not return values from other runtimes. Parameters ---------- key : str The key of the record to retrieve Returns ------- value : bytes | None The value of the record or `None` if the record is not found """ record = await self.stub.Get(LocalKVKey(key=key)) if record.data is not None: return record.data.data else: return None
Gets a record from the key-value store if it exists.
This will not return values from other runtimes.
Parameters
key
:str
- The key of the record to retrieve
Returns
value
:bytes | None
- The value of the record or
None
if the record is not found
async def put(self, key: str, value: str | bytes) ‑> bytes | None
-
Expand source code
async def put(self, key: str, value: str | bytes) -> bytes | None: """ Puts a record into the key-value store. Returns the previous value if it exists, otherwise `None`. This only has local effects and does not affect other runtimes. However, the record is persisted across restarts of the runtime. Parameters ---------- key : str The key of the record value : str | bytes The value of the record Returns ------- value : bytes | None The previous value of the record or `None` if it did not exist """ record = await self.stub.Put( LocalKVRecord(key=key, value=Data(data=enc(value))) ) if record.data is not None: return record.data.data else: return None
Puts a record into the key-value store.
Returns the previous value if it exists, otherwise
None
. This only has local effects and does not affect other runtimes. However, the record is persisted across restarts of the runtime.Parameters
key
:str
- The key of the record
value
:str | bytes
- The value of the record
Returns
value
:bytes | None
- The previous value of the record or
None
if it did not exist
class ManagedStream (stream)
-
Expand source code
class ManagedStream(Generic[T]): def __init__(self, stream): self.stream = stream async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): self.stream.cancel() def __aiter__(self) -> AsyncIterator[T]: return self.stream.__aiter__()
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Ancestors
- typing.Generic
class NeighboursService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class NeighboursService: """ A handle to the neighbours service. Exposes methods to interact with the neighbours service, such as subscribing to neighbour events and getting the current set of neighbours. """ def __init__(self, conn: Channel): self.stub = NeighboursStub(conn) self.empty = Empty() def subscribe(self) -> ManagedStream[NeighbourEvent]: """ Subscribes to neighbour events. Returns a stream of neighbour events. The stream will emit an event whenever the local runtime detects a change in the set of neighbours. The stream is guaranteed to emit an `init` event directly after subscribing and only `discovered` and `lost` events afterwards. Returns ------- stream : ManagedStream[NeighbourEvent] A stream of neighbour events """ neighbour_event_stream = self.stub.Subscribe(self.empty) return ManagedStream(neighbour_event_stream) async def get(self) -> list[str]: """ Returns the peer IDs of the current neighbours. Returns ------- peers : list[str] A list of peer IDs of the current neighbours """ peers = await self.stub.Get(self.empty) return [peer.peer_id for peer in peers.peers]
A handle to the neighbours service.
Exposes methods to interact with the neighbours service, such as subscribing to neighbour events and getting the current set of neighbours.
Methods
async def get(self) ‑> list[str]
-
Expand source code
async def get(self) -> list[str]: """ Returns the peer IDs of the current neighbours. Returns ------- peers : list[str] A list of peer IDs of the current neighbours """ peers = await self.stub.Get(self.empty) return [peer.peer_id for peer in peers.peers]
Returns the peer IDs of the current neighbours.
Returns
peers
:list[str]
- A list of peer IDs of the current neighbours
def subscribe(self) ‑> ManagedStream[bridge_pb2.NeighbourEvent]
-
Expand source code
def subscribe(self) -> ManagedStream[NeighbourEvent]: """ Subscribes to neighbour events. Returns a stream of neighbour events. The stream will emit an event whenever the local runtime detects a change in the set of neighbours. The stream is guaranteed to emit an `init` event directly after subscribing and only `discovered` and `lost` events afterwards. Returns ------- stream : ManagedStream[NeighbourEvent] A stream of neighbour events """ neighbour_event_stream = self.stub.Subscribe(self.empty) return ManagedStream(neighbour_event_stream)
Subscribes to neighbour events.
Returns a stream of neighbour events. The stream will emit an event whenever the local runtime detects a change in the set of neighbours. The stream is guaranteed to emit an
init
event directly after subscribing and onlydiscovered
andlost
events afterwards.Returns
stream
:ManagedStream[NeighbourEvent]
- A stream of neighbour events
class OpenedConnection (conn: Connection)
-
Expand source code
class OpenedConnection: """ An opened connection to the HyveOS runtime. This class provides access to the various services provided by HyveOS. An instance of this class is obtained by entering a `Connection` context manager. Example ------- ```python from hyveos_sdk import Connection async def main(): async with Connection() as conn: peer_id = await conn.get_id() print(f'My peer ID: {peer_id}') ``` """ _conn: grpc.aio.Channel _shared_dir_path: Optional[Path] _uri: Optional[str] _session: Optional[aiohttp.ClientSession] def __init__(self, conn: Connection): self._conn = conn._conn self._shared_dir_path = conn._shared_dir_path self._uri = conn._uri self._session = conn._session def get_apps_service(self) -> AppsService: """ Returns a handle to the application management service. Returns ------- AppsService A handle to the application management service. """ return AppsService(self._conn) def get_debug_service(self) -> DebugService: """ Returns a handle to the debug service. Returns ------- DebugService A handle to the debug service. """ return DebugService(self._conn) def get_discovery_service(self) -> DiscoveryService: """ Returns a handle to the discovery service. Returns ------- DiscoveryService A handle to the discovery service. """ return DiscoveryService(self._conn) def get_file_transfer_service(self) -> FileTransferService: """ Returns a handle to the file transfer service. Returns ------- FileTransferService A handle to the file transfer service. """ if self._uri is not None and self._session is not None: return NetworkFileTransferService(self._uri, self._session) else: return GrpcFileTransferService(self._conn, self._shared_dir_path) def get_kv_service(self) -> KVService: """ Returns a handle to the distributed key-value store service. Returns ------- KVService A handle to the distributed key-value store service. """ return KVService(self._conn) def get_local_kv_service(self) -> LocalKVService: """ Returns a handle to the local key-value store service. Returns ------- LocalKVService A handle to the local key-value store service. """ return LocalKVService(self._conn) def get_neighbours_service(self) -> NeighboursService: """ Returns a handle to the neighbours service. Returns ------- NeighboursService A handle to the neighbours service. """ return NeighboursService(self._conn) def get_pub_sub_service(self) -> PubSubService: """ Returns a handle to the pub-sub service. Returns ------- PubSubService A handle to the pub-sub service. """ return PubSubService(self._conn) def get_request_response_service(self) -> RequestResponseService: """ Returns a handle to the request-response service. Returns ------- RequestResponseService A handle to the request-response service. """ return RequestResponseService(self._conn) async def get_id(self) -> str: """ Returns the peer ID of the local runtime. Returns ------- str The peer ID of the runtime. """ peer = await DiscoveryStub(self._conn).GetOwnId(Empty()) return peer.peer_id
An opened connection to the HyveOS runtime.
This class provides access to the various services provided by HyveOS.
An instance of this class is obtained by entering a
Connection
context manager.Example
from hyveos_sdk import Connection async def main(): async with Connection() as conn: peer_id = await conn.get_id() print(f'My peer ID: {peer_id}')
Methods
def get_apps_service(self) ‑> AppsService
-
Expand source code
def get_apps_service(self) -> AppsService: """ Returns a handle to the application management service. Returns ------- AppsService A handle to the application management service. """ return AppsService(self._conn)
Returns a handle to the application management service.
Returns
AppsService
- A handle to the application management service.
def get_debug_service(self) ‑> DebugService
-
Expand source code
def get_debug_service(self) -> DebugService: """ Returns a handle to the debug service. Returns ------- DebugService A handle to the debug service. """ return DebugService(self._conn)
def get_discovery_service(self) ‑> DiscoveryService
-
Expand source code
def get_discovery_service(self) -> DiscoveryService: """ Returns a handle to the discovery service. Returns ------- DiscoveryService A handle to the discovery service. """ return DiscoveryService(self._conn)
Returns a handle to the discovery service.
Returns
DiscoveryService
- A handle to the discovery service.
def get_file_transfer_service(self) ‑> FileTransferService
-
Expand source code
def get_file_transfer_service(self) -> FileTransferService: """ Returns a handle to the file transfer service. Returns ------- FileTransferService A handle to the file transfer service. """ if self._uri is not None and self._session is not None: return NetworkFileTransferService(self._uri, self._session) else: return GrpcFileTransferService(self._conn, self._shared_dir_path)
Returns a handle to the file transfer service.
Returns
FileTransferService
- A handle to the file transfer service.
async def get_id(self) ‑> str
-
Expand source code
async def get_id(self) -> str: """ Returns the peer ID of the local runtime. Returns ------- str The peer ID of the runtime. """ peer = await DiscoveryStub(self._conn).GetOwnId(Empty()) return peer.peer_id
Returns the peer ID of the local runtime.
Returns
str
- The peer ID of the runtime.
def get_kv_service(self) ‑> KVService
-
Expand source code
def get_kv_service(self) -> KVService: """ Returns a handle to the distributed key-value store service. Returns ------- KVService A handle to the distributed key-value store service. """ return KVService(self._conn)
Returns a handle to the distributed key-value store service.
Returns
KVService
- A handle to the distributed key-value store service.
def get_local_kv_service(self) ‑> LocalKVService
-
Expand source code
def get_local_kv_service(self) -> LocalKVService: """ Returns a handle to the local key-value store service. Returns ------- LocalKVService A handle to the local key-value store service. """ return LocalKVService(self._conn)
Returns a handle to the local key-value store service.
Returns
LocalKVService
- A handle to the local key-value store service.
def get_neighbours_service(self) ‑> NeighboursService
-
Expand source code
def get_neighbours_service(self) -> NeighboursService: """ Returns a handle to the neighbours service. Returns ------- NeighboursService A handle to the neighbours service. """ return NeighboursService(self._conn)
Returns a handle to the neighbours service.
Returns
NeighboursService
- A handle to the neighbours service.
def get_pub_sub_service(self) ‑> PubSubService
-
Expand source code
def get_pub_sub_service(self) -> PubSubService: """ Returns a handle to the pub-sub service. Returns ------- PubSubService A handle to the pub-sub service. """ return PubSubService(self._conn)
def get_request_response_service(self) ‑> RequestResponseService
-
Expand source code
def get_request_response_service(self) -> RequestResponseService: """ Returns a handle to the request-response service. Returns ------- RequestResponseService A handle to the request-response service. """ return RequestResponseService(self._conn)
Returns a handle to the request-response service.
Returns
RequestResponseService
- A handle to the request-response service.
class PubSubService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class PubSubService: """ A handle to the pub-sub service. Exposes methods to interact with the pub-sub service, like for subscribing to topics and publishing messages. """ def __init__(self, conn: Channel): self.stub = PubSubStub(conn) def subscribe(self, topic: str) -> ManagedStream[PubSubRecvMessage]: """ Subscribes to a topic and returns a stream of messages published to that topic. Parameters ---------- topic : str The topic to subscribe to Returns ------- stream : ManagedStream[GossipSubRecvMessage] Stream of received messages from the pub-sub topic """ gossip_sub_recv_messages_stream = self.stub.Subscribe(Topic(topic=topic)) return ManagedStream(gossip_sub_recv_messages_stream) async def publish(self, data: str | bytes, topic: str) -> bytes: """ Publishes a message to a topic. Parameters ---------- data : str | bytes Data to publish topic : str Topic to publish the data into Returns ------- gossip_sub_message_id : bytes ID of the sent message """ send_data = Data(data=enc(data)) gossip_sub_message = PubSubMessage(data=send_data, topic=Topic(topic=topic)) gossip_sub_message_id = await self.stub.Publish(gossip_sub_message) return gossip_sub_message_id.id
A handle to the pub-sub service.
Exposes methods to interact with the pub-sub service, like for subscribing to topics and publishing messages.
Methods
async def publish(self, data: str | bytes, topic: str) ‑> bytes
-
Expand source code
async def publish(self, data: str | bytes, topic: str) -> bytes: """ Publishes a message to a topic. Parameters ---------- data : str | bytes Data to publish topic : str Topic to publish the data into Returns ------- gossip_sub_message_id : bytes ID of the sent message """ send_data = Data(data=enc(data)) gossip_sub_message = PubSubMessage(data=send_data, topic=Topic(topic=topic)) gossip_sub_message_id = await self.stub.Publish(gossip_sub_message) return gossip_sub_message_id.id
Publishes a message to a topic.
Parameters
data
:str | bytes
- Data to publish
topic
:str
- Topic to publish the data into
Returns
gossip_sub_message_id
:bytes
- ID of the sent message
def subscribe(self, topic: str) ‑> ManagedStream[bridge_pb2.PubSubRecvMessage]
-
Expand source code
def subscribe(self, topic: str) -> ManagedStream[PubSubRecvMessage]: """ Subscribes to a topic and returns a stream of messages published to that topic. Parameters ---------- topic : str The topic to subscribe to Returns ------- stream : ManagedStream[GossipSubRecvMessage] Stream of received messages from the pub-sub topic """ gossip_sub_recv_messages_stream = self.stub.Subscribe(Topic(topic=topic)) return ManagedStream(gossip_sub_recv_messages_stream)
Subscribes to a topic and returns a stream of messages published to that topic.
Parameters
topic
:str
- The topic to subscribe to
Returns
stream
:ManagedStream[GossipSubRecvMessage]
- Stream of received messages from the pub-sub topic
class RequestResponseService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class RequestResponseService: """ A handle to the request-response service. Exposes methods to interact with the request-response service, like for sending and receiving requests, and for sending responses. """ def __init__(self, conn: Channel): self.stub = ReqRespStub(conn) async def send_request( self, peer_id: str, data: str | bytes, topic: Optional[str] = None ) -> Response: """ Sends a request with an optional topic to a peer and returns the response. The peer must be subscribed to the topic in order to receive the request. If `topic` is `None`, the peer must be subscribed to `None` as well. Parameters ---------- peer_id : str The peer_id of the target data : str | bytes Data to send topic : str, optional Topic the peer should be subscribed to if this argument is specified (default: None) Returns ------- response : Response Reponse from Peer `peer_id` to the sent request, awaited """ optional_topic = OptionalTopic() if topic is not None: optional_topic = OptionalTopic(topic=Topic(topic=topic)) send_data = Data(data=enc(data)) message = Message(data=send_data, topic=optional_topic) send_request = SendRequest(peer=Peer(peer_id=peer_id), msg=message) response = await self.stub.Send(send_request) return response def receive( self, query: Optional[str] = None, regex: bool = False, ) -> ManagedStream[RecvRequest]: """ Subscribes to a topic and returns a stream of received requests. Parameters ---------- query : str, optional Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None) regex : bool Query is specified as a regex, not a single `topic` string (default: False) Returns ------- stream : ManagedStream[RecvRequest] Stream of received requests from the specified topic """ optional_topic_query = OptionalTopicQuery() if query is not None: if regex: optional_topic_query = OptionalTopicQuery(query=TopicQuery(regex=query)) else: optional_topic_query = OptionalTopicQuery( query=TopicQuery(topic=Topic(topic=query)) ) stream = self.stub.Recv(optional_topic_query) return ManagedStream(stream) async def respond( self, seq: int, data: str | bytes, error: Optional[str] = None ) -> None: """ Respond to a request received from receive() Parameters ---------- seq : int Sequence number for request-response matching data : str | bytes Reponse message data. If error is specified, this won't reach the peer error : str Respond with an error message if an error occurred (default: None) """ if error is not None: response = Response(error=error) else: response = Response(data=Data(data=enc(data))) send_response = SendResponse(seq=seq, response=response) await self.stub.Respond(send_response)
A handle to the request-response service.
Exposes methods to interact with the request-response service, like for sending and receiving requests, and for sending responses.
Methods
def receive(self, query: str | None = None, regex: bool = False) ‑> ManagedStream[bridge_pb2.RecvRequest]
-
Expand source code
def receive( self, query: Optional[str] = None, regex: bool = False, ) -> ManagedStream[RecvRequest]: """ Subscribes to a topic and returns a stream of received requests. Parameters ---------- query : str, optional Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None) regex : bool Query is specified as a regex, not a single `topic` string (default: False) Returns ------- stream : ManagedStream[RecvRequest] Stream of received requests from the specified topic """ optional_topic_query = OptionalTopicQuery() if query is not None: if regex: optional_topic_query = OptionalTopicQuery(query=TopicQuery(regex=query)) else: optional_topic_query = OptionalTopicQuery( query=TopicQuery(topic=Topic(topic=query)) ) stream = self.stub.Recv(optional_topic_query) return ManagedStream(stream)
Subscribes to a topic and returns a stream of received requests.
Parameters
query
:str
, optional- Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None)
regex
:bool
- Query is specified as a regex, not a single
topic
string (default: False)
Returns
stream
:ManagedStream[RecvRequest]
- Stream of received requests from the specified topic
async def respond(self, seq: int, data: str | bytes, error: str | None = None) ‑> None
-
Expand source code
async def respond( self, seq: int, data: str | bytes, error: Optional[str] = None ) -> None: """ Respond to a request received from receive() Parameters ---------- seq : int Sequence number for request-response matching data : str | bytes Reponse message data. If error is specified, this won't reach the peer error : str Respond with an error message if an error occurred (default: None) """ if error is not None: response = Response(error=error) else: response = Response(data=Data(data=enc(data))) send_response = SendResponse(seq=seq, response=response) await self.stub.Respond(send_response)
Respond to a request received from receive()
Parameters
seq
:int
- Sequence number for request-response matching
data
:str | bytes
- Reponse message data. If error is specified, this won't reach the peer
error
:str
- Respond with an error message if an error occurred (default: None)
async def send_request(self, peer_id: str, data: str | bytes, topic: str | None = None) ‑> bridge_pb2.Response
-
Expand source code
async def send_request( self, peer_id: str, data: str | bytes, topic: Optional[str] = None ) -> Response: """ Sends a request with an optional topic to a peer and returns the response. The peer must be subscribed to the topic in order to receive the request. If `topic` is `None`, the peer must be subscribed to `None` as well. Parameters ---------- peer_id : str The peer_id of the target data : str | bytes Data to send topic : str, optional Topic the peer should be subscribed to if this argument is specified (default: None) Returns ------- response : Response Reponse from Peer `peer_id` to the sent request, awaited """ optional_topic = OptionalTopic() if topic is not None: optional_topic = OptionalTopic(topic=Topic(topic=topic)) send_data = Data(data=enc(data)) message = Message(data=send_data, topic=optional_topic) send_request = SendRequest(peer=Peer(peer_id=peer_id), msg=message) response = await self.stub.Send(send_request) return response
Sends a request with an optional topic to a peer and returns the response.
The peer must be subscribed to the topic in order to receive the request. If
topic
isNone
, the peer must be subscribed toNone
as well.Parameters
peer_id
:str
- The peer_id of the target
data
:str | bytes
- Data to send
topic
:str
, optional- Topic the peer should be subscribed to if this argument is specified (default: None)
Returns
response
:Response
- Reponse from Peer
peer_id
to the sent request, awaited