Module hyveos_sdk.services.stream
Classes
class ManagedStream (stream)
-
Expand source code
class ManagedStream(Generic[T]): def __init__(self, stream): self.stream = stream async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): self.stream.cancel() def __aiter__(self) -> AsyncIterator[T]: return self.stream.__aiter__()
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Ancestors
- typing.Generic