Module hyveos_sdk.services.req_res
Classes
class Data (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class Message (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class OptionalTopic (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class OptionalTopicQuery (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class Peer (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class RecvRequest (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class RequestResponseService (conn: grpc.aio._base_channel.Channel)
-
Expand source code
class RequestResponseService: """ A handle to the request-response service. Exposes methods to interact with the request-response service, like for sending and receiving requests, and for sending responses. """ def __init__(self, conn: Channel): self.stub = ReqRespStub(conn) async def send_request( self, peer_id: str, data: str | bytes, topic: Optional[str] = None ) -> Response: """ Sends a request with an optional topic to a peer and returns the response. The peer must be subscribed to the topic in order to receive the request. If `topic` is `None`, the peer must be subscribed to `None` as well. Parameters ---------- peer_id : str The peer_id of the target data : str | bytes Data to send topic : str, optional Topic the peer should be subscribed to if this argument is specified (default: None) Returns ------- response : Response Reponse from Peer `peer_id` to the sent request, awaited """ optional_topic = OptionalTopic() if topic is not None: optional_topic = OptionalTopic(topic=Topic(topic=topic)) send_data = Data(data=enc(data)) message = Message(data=send_data, topic=optional_topic) send_request = SendRequest(peer=Peer(peer_id=peer_id), msg=message) response = await self.stub.Send(send_request) return response def receive( self, query: Optional[str] = None, regex: bool = False, ) -> ManagedStream[RecvRequest]: """ Subscribes to a topic and returns a stream of received requests. Parameters ---------- query : str, optional Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None) regex : bool Query is specified as a regex, not a single `topic` string (default: False) Returns ------- stream : ManagedStream[RecvRequest] Stream of received requests from the specified topic """ optional_topic_query = OptionalTopicQuery() if query is not None: if regex: optional_topic_query = OptionalTopicQuery(query=TopicQuery(regex=query)) else: optional_topic_query = OptionalTopicQuery( query=TopicQuery(topic=Topic(topic=query)) ) stream = self.stub.Recv(optional_topic_query) return ManagedStream(stream) async def respond( self, seq: int, data: str | bytes, error: Optional[str] = None ) -> None: """ Respond to a request received from receive() Parameters ---------- seq : int Sequence number for request-response matching data : str | bytes Reponse message data. If error is specified, this won't reach the peer error : str Respond with an error message if an error occurred (default: None) """ if error is not None: response = Response(error=error) else: response = Response(data=Data(data=enc(data))) send_response = SendResponse(seq=seq, response=response) await self.stub.Respond(send_response)
A handle to the request-response service.
Exposes methods to interact with the request-response service, like for sending and receiving requests, and for sending responses.
Methods
def receive(self, query: str | None = None, regex: bool = False) ‑> ManagedStream[bridge_pb2.RecvRequest]
-
Expand source code
def receive( self, query: Optional[str] = None, regex: bool = False, ) -> ManagedStream[RecvRequest]: """ Subscribes to a topic and returns a stream of received requests. Parameters ---------- query : str, optional Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None) regex : bool Query is specified as a regex, not a single `topic` string (default: False) Returns ------- stream : ManagedStream[RecvRequest] Stream of received requests from the specified topic """ optional_topic_query = OptionalTopicQuery() if query is not None: if regex: optional_topic_query = OptionalTopicQuery(query=TopicQuery(regex=query)) else: optional_topic_query = OptionalTopicQuery( query=TopicQuery(topic=Topic(topic=query)) ) stream = self.stub.Recv(optional_topic_query) return ManagedStream(stream)
Subscribes to a topic and returns a stream of received requests.
Parameters
query
:str
, optional- Either a topic subscribed to or a regex that describes topics if this argument is specified (default: None)
regex
:bool
- Query is specified as a regex, not a single
topic
string (default: False)
Returns
stream
:ManagedStream[RecvRequest]
- Stream of received requests from the specified topic
async def respond(self, seq: int, data: str | bytes, error: str | None = None) ‑> None
-
Expand source code
async def respond( self, seq: int, data: str | bytes, error: Optional[str] = None ) -> None: """ Respond to a request received from receive() Parameters ---------- seq : int Sequence number for request-response matching data : str | bytes Reponse message data. If error is specified, this won't reach the peer error : str Respond with an error message if an error occurred (default: None) """ if error is not None: response = Response(error=error) else: response = Response(data=Data(data=enc(data))) send_response = SendResponse(seq=seq, response=response) await self.stub.Respond(send_response)
Respond to a request received from receive()
Parameters
seq
:int
- Sequence number for request-response matching
data
:str | bytes
- Reponse message data. If error is specified, this won't reach the peer
error
:str
- Respond with an error message if an error occurred (default: None)
async def send_request(self, peer_id: str, data: str | bytes, topic: str | None = None) ‑> bridge_pb2.Response
-
Expand source code
async def send_request( self, peer_id: str, data: str | bytes, topic: Optional[str] = None ) -> Response: """ Sends a request with an optional topic to a peer and returns the response. The peer must be subscribed to the topic in order to receive the request. If `topic` is `None`, the peer must be subscribed to `None` as well. Parameters ---------- peer_id : str The peer_id of the target data : str | bytes Data to send topic : str, optional Topic the peer should be subscribed to if this argument is specified (default: None) Returns ------- response : Response Reponse from Peer `peer_id` to the sent request, awaited """ optional_topic = OptionalTopic() if topic is not None: optional_topic = OptionalTopic(topic=Topic(topic=topic)) send_data = Data(data=enc(data)) message = Message(data=send_data, topic=optional_topic) send_request = SendRequest(peer=Peer(peer_id=peer_id), msg=message) response = await self.stub.Send(send_request) return response
Sends a request with an optional topic to a peer and returns the response.
The peer must be subscribed to the topic in order to receive the request. If
topic
isNone
, the peer must be subscribed toNone
as well.Parameters
peer_id
:str
- The peer_id of the target
data
:str | bytes
- Data to send
topic
:str
, optional- Topic the peer should be subscribed to if this argument is specified (default: None)
Returns
response
:Response
- Reponse from Peer
peer_id
to the sent request, awaited
class Response (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class SendRequest (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class SendResponse (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class Topic (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class TopicQuery (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR