Module hyveos_sdk.services.stream

Classes

class ManagedStream (stream)
Expand source code
class ManagedStream(Generic[T]):
    def __init__(self, stream):
        self.stream = stream

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.stream.cancel()

    def __aiter__(self) -> AsyncIterator[T]:
        return self.stream.__aiter__()

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Ancestors

  • typing.Generic