StreamManager
Stream_Manager utilities for anthropic ai model integration and logging utilities.
TextStreamWrapper
Wrapper for text stream events with callback support.
This class wraps a text stream to provide callback functionality for processing text events. It implements the iterator protocol to allow for seamless iteration over text content.
Attributes:
Name | Description |
---|---|
_wrapper | The underlying stream wrapper. |
_callback | Callback function to process stream events. |
__init__
Initialize the text stream wrapper.
Arguments:
Name | Description |
---|---|
wrapper | The underlying stream wrapper to delegate to. |
callback | Callback function to be called for each event. |
__iter__
Return the iterator object.
Returns:
Name | Description |
---|---|
[TextStreamWrapper](/sdk/python/references/logger/anthropic/stream_manager) | Self as the iterator. |
__next__
Get the next text content from the stream.
This method processes the next event from the stream, calls the callback function, and returns the text content if available.
Returns:
Name | Description |
---|---|
str | The text content from the event, or empty string if no text. |
Raises:
StopIteration
- When the stream is exhausted.
StreamWrapper
Wrapper for Anthropic MessageStreamManager with callback support.
This class wraps the Anthropic MessageStreamManager to provide callback functionality for processing stream events. It maintains compatibility with the original API while adding the ability to execute custom callbacks for each stream event.
The wrapper implements the context manager protocol and iterator protocol to provide seamless integration with existing code.
Attributes:
Name | Type | Description |
---|---|---|
_mgr | MessageStreamManager | The underlying stream manager. |
_callback | Callable | Callback function for processing events. |
_stream | MessageStream | The active stream instance. |
__init__
Initialize the stream wrapper.
Arguments:
Name | Type | Description |
---|---|---|
mgr | MessageStreamManager | The underlying stream manager to wrap. |
callback | Callable[[MessageStreamEvent], None] | Callback function |
to be called for each stream event.
Notes:
Does not call super().init() since we’re wrapping another manager.
__enter__
Enter the stream context and return a callback-enabled stream.
This method creates a custom MessageStream that wraps the original stream to add callback functionality for each event.
Returns:
Name | Description |
---|---|
MessageStream | A stream instance with callback support. |
__exit__
Exit the stream context.
This method delegates the context exit to the underlying stream manager to ensure proper cleanup and resource management.
Arguments:
Name | Description |
---|---|
exc_type | The exception type if an exception occurred. |
exc | The exception instance if an exception occurred. |
exc_tb | The traceback if an exception occurred. |
__iter__
Return the iterator object.
If no stream is active, this method will automatically enter the context to create one.
Returns:
Name | Description |
---|---|
[StreamWrapper](/sdk/python/references/logger/anthropic/stream_manager) | Self as the iterator. |
__next__
Get the next event from the stream.
If no stream is active, this method will automatically enter the context to create one.
Returns:
Name | Description |
---|---|
MessageStreamEvent | The next event from the stream. |
Raises:
StopIteration
- When the stream is exhausted.
__getattr__
Delegate attribute access to the wrapped manager.
This method ensures that any attributes or methods not explicitly defined in this wrapper are delegated to the underlying stream manager.
Arguments:
Name | Type | Description |
---|---|---|
name | str | The attribute name to access. |
Returns:
Name | Description |
---|---|
Any | The attribute value from the underlying manager. |