diff --git a/src/py/reactpy/reactpy/backend/hooks.py b/src/py/reactpy/reactpy/backend/hooks.py
index 19ad114ed..7918e87c4 100644
--- a/src/py/reactpy/reactpy/backend/hooks.py
+++ b/src/py/reactpy/reactpy/backend/hooks.py
@@ -3,12 +3,15 @@
 from collections.abc import MutableMapping
 from typing import Any
 
+from reactpy.backend.messenger import Messenger
 from reactpy.backend.types import Connection, Location
 from reactpy.core.hooks import Context, create_context, use_context
 
 # backend implementations should establish this context at the root of an app
 ConnectionContext: Context[Connection[Any] | None] = create_context(None)
 
+MessengerContext: Context[Messenger | None] = create_context(None)
+
 
 def use_connection() -> Connection[Any]:
     """Get the current :class:`~reactpy.backend.types.Connection`."""
@@ -27,3 +30,12 @@ def use_scope() -> MutableMapping[str, Any]:
 def use_location() -> Location:
     """Get the current :class:`~reactpy.backend.types.Connection`'s location."""
     return use_connection().location
+
+
+def use_messenger() -> Messenger:
+    """Get the current :class:`~reactpy.core.serve.Messenger`."""
+    messenger = use_context(MessengerContext)
+    if messenger is None:  # nocov
+        msg = "No backend established a messenger."
+        raise RuntimeError(msg)
+    return messenger
diff --git a/src/py/reactpy/reactpy/backend/messenger.py b/src/py/reactpy/reactpy/backend/messenger.py
new file mode 100644
index 000000000..4d288878c
--- /dev/null
+++ b/src/py/reactpy/reactpy/backend/messenger.py
@@ -0,0 +1,61 @@
+from __future__ import annotations
+
+from collections.abc import AsyncIterator, Awaitable
+from typing import Callable
+
+from anyio import Event, create_memory_object_stream, create_task_group
+from anyio.abc import ObjectReceiveStream, ObjectSendStream
+
+from reactpy.core.types import Message
+
+_MessageStream = tuple[ObjectSendStream[Message], ObjectReceiveStream[Message]]
+
+
+class Messenger:
+    """A messenger for sending and receiving messages"""
+
+    def __init__(self) -> None:
+        self._task_group = create_task_group()
+        self._streams: dict[str, list[_MessageStream]] = {}
+        self._recv_began: dict[str, Event] = {}
+
+    def start_producer(self, producer: Callable[[], AsyncIterator[Message]]) -> None:
+        """Add a message producer"""
+
+        async def _producer() -> None:
+            async for message in producer():
+                await self.send(message)
+
+        self._task_group.start_soon(_producer)
+
+    def start_consumer(
+        self, message_type: str, consumer: Callable[[Message], Awaitable[None]]
+    ) -> None:
+        """Add a message consumer"""
+
+        async def _consumer() -> None:
+            async for message in self.receive(message_type):
+                self._task_group.start_soon(consumer, message)
+
+        self._task_group.start_soon(_consumer)
+
+    async def send(self, message: Message) -> None:
+        """Send a message to all consumers of the message type"""
+        for send, _ in self._streams.get(message["type"], []):
+            await send.send(message)
+
+    async def receive(self, message_type: str) -> AsyncIterator[Message]:
+        """Receive messages of a given type"""
+        send, recv = create_memory_object_stream()
+        self._streams.setdefault(message_type, []).append((send, recv))
+        async with recv:
+            async with send:
+                async for message in recv:
+                    yield message
+
+    async def __aenter__(self) -> Messenger:
+        await self._task_group.__aenter__()
+        return self
+
+    async def __aexit__(self, *args) -> None:
+        await self._task_group.__aexit__(*args)
diff --git a/src/py/reactpy/reactpy/backend/starlette.py b/src/py/reactpy/reactpy/backend/starlette.py
index 3a9695b33..9988789ff 100644
--- a/src/py/reactpy/reactpy/backend/starlette.py
+++ b/src/py/reactpy/reactpy/backend/starlette.py
@@ -3,10 +3,12 @@
 import asyncio
 import json
 import logging
-from collections.abc import Awaitable
+from collections.abc import AsyncIterator, Awaitable
 from dataclasses import dataclass
 from typing import Any, Callable
 
+from py.reactpy.reactpy.backend.hooks import MessengerContext
+from py.reactpy.reactpy.core.serve import Messenger
 from starlette.applications import Starlette
 from starlette.middleware.cors import CORSMiddleware
 from starlette.requests import Request
@@ -141,6 +143,25 @@ async def model_stream(socket: WebSocket) -> None:
         pathname = pathname[len(options.url_prefix) :] or "/"
         search = socket.scope["query_string"].decode()
 
+        async with Messenger() as msgr:
+            async with Layout(
+                MessengerContext(
+                    ConnectionContext(
+                        component(),
+                        value=Connection(
+                            scope=socket.scope,
+                            location=Location(pathname, f"?{search}" if search else ""),
+                            carrier=socket,
+                        ),
+                    ),
+                    value=msgr,
+                )
+            ) as layout:
+                msgr.start_consumer("layout-event", layout.deliver)
+                msgr.start_producer(layout.renders)
+                msgr.start_consumer("layout-update", send)
+                msgr.start_producer(recv)
+
         try:
             await serve_layout(
                 Layout(
@@ -166,7 +187,8 @@ def _make_send_recv_callbacks(
     async def sock_send(value: Any) -> None:
         await socket.send_text(json.dumps(value))
 
-    async def sock_recv() -> Any:
-        return json.loads(await socket.receive_text())
+    async def sock_recv() -> AsyncIterator[Any]:
+        while True:
+            yield json.loads(await socket.receive_text())
 
     return sock_send, sock_recv
diff --git a/src/py/reactpy/reactpy/core/events.py b/src/py/reactpy/reactpy/core/events.py
index cd5de3228..8d63a68f0 100644
--- a/src/py/reactpy/reactpy/core/events.py
+++ b/src/py/reactpy/reactpy/core/events.py
@@ -2,7 +2,7 @@
 
 import asyncio
 from collections.abc import Sequence
-from typing import Any, Callable, Literal, overload
+from typing import Any, Callable, Literal, Optional, overload
 
 from anyio import create_task_group
 
diff --git a/src/py/reactpy/reactpy/core/layout.py b/src/py/reactpy/reactpy/core/layout.py
index df24a9a0a..30420a86a 100644
--- a/src/py/reactpy/reactpy/core/layout.py
+++ b/src/py/reactpy/reactpy/core/layout.py
@@ -3,7 +3,7 @@
 import abc
 import asyncio
 from collections import Counter
-from collections.abc import Iterator
+from collections.abc import AsyncIterator, Iterator
 from contextlib import ExitStack
 from logging import getLogger
 from typing import (
@@ -99,6 +99,11 @@ async def deliver(self, event: LayoutEventMessage) -> None:
                 "does not exist or its component unmounted"
             )
 
+    async def renders(self) -> AsyncIterator[LayoutUpdateMessage]:
+        """Yield all available renders"""
+        while True:
+            yield await self.render()
+
     async def render(self) -> LayoutUpdateMessage:
         """Await the next available render. This will block until a component is updated"""
         while True:
diff --git a/src/py/reactpy/reactpy/core/serve.py b/src/py/reactpy/reactpy/core/serve.py
index 3a530e854..0efd1d4ef 100644
--- a/src/py/reactpy/reactpy/core/serve.py
+++ b/src/py/reactpy/reactpy/core/serve.py
@@ -1,14 +1,20 @@
 from __future__ import annotations
 
-from collections.abc import Awaitable
+import warnings
+from collections.abc import AsyncIterator, Awaitable
 from logging import getLogger
 from typing import Callable
 
-from anyio import create_task_group
-from anyio.abc import TaskGroup
+from anyio import Event, create_memory_object_stream, create_task_group
+from anyio.abc import ObjectReceiveStream, ObjectSendStream, TaskGroup
 
 from reactpy.config import REACTPY_DEBUG_MODE
-from reactpy.core.types import LayoutEventMessage, LayoutType, LayoutUpdateMessage
+from reactpy.core.types import (
+    LayoutEventMessage,
+    LayoutType,
+    LayoutUpdateMessage,
+    Message,
+)
 
 logger = getLogger(__name__)
 
@@ -37,6 +43,11 @@ async def serve_layout(
     recv: RecvCoroutine,
 ) -> None:
     """Run a dispatch loop for a single view instance"""
+    warnings.warn(
+        "serve_layout is deprecated. Use a Messenger object instead.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
     async with layout:
         try:
             async with create_task_group() as task_group:
diff --git a/src/py/reactpy/reactpy/core/types.py b/src/py/reactpy/reactpy/core/types.py
index 45f300f4f..419208a26 100644
--- a/src/py/reactpy/reactpy/core/types.py
+++ b/src/py/reactpy/reactpy/core/types.py
@@ -2,7 +2,7 @@
 
 import sys
 from collections import namedtuple
-from collections.abc import Mapping, Sequence
+from collections.abc import AsyncIterator, Mapping, Sequence
 from types import TracebackType
 from typing import (
     TYPE_CHECKING,
@@ -73,6 +73,9 @@ class LayoutType(Protocol[_Render, _Event]):
     async def render(self) -> _Render:
         """Render an update to a view"""
 
+    async def renders(self) -> AsyncIterator[_Render]:
+        """Render a series of updates to a view"""
+
     async def deliver(self, event: _Event) -> None:
         """Relay an event to its respective handler"""
 
@@ -213,7 +216,14 @@ def __call__(
         ...
 
 
-class LayoutUpdateMessage(TypedDict):
+class Message(TypedDict):
+    """Base class for all messages"""
+
+    type: str
+    """The type of message"""
+
+
+class LayoutUpdateMessage(Message):
     """A message describing an update to a layout"""
 
     type: Literal["layout-update"]
@@ -224,7 +234,7 @@ class LayoutUpdateMessage(TypedDict):
     """The model to assign at the given JSON Pointer path"""
 
 
-class LayoutEventMessage(TypedDict):
+class LayoutEventMessage(Message):
     """Message describing an event originating from an element in the layout"""
 
     type: Literal["layout-event"]