Skip to content

Connecting via API

Before continuing, select the modality, mode of your Stream and whether you're using WebRTC or WebSockets.

Sample Code

Message Format

Over both WebRTC and WebSocket, the server can send messages of the following format:

{
    "type": `send_input` | `fetch_output` | `stopword` | `error` | `warning` | `log`,
    "data": string | object
}
  • send_input: Send any input data for the handler to the server. See Additional Inputs for more details.
  • fetch_output: An instance of AdditionalOutputs is sent to the server.
  • stopword: The stopword has been detected. See ReplyOnStopWords for more details.
  • error: An error occurred. The data will be a string containing the error message.
  • warning: A warning occurred. The data will be a string containing the warning message.
  • log: A log message. The data will be a string containing the log message.

The ReplyOnPause handler can also send the following log messages.

{
    "type": "log",
    "data": "pause_detected" | "response_starting"
}

Tip

When using WebRTC, the messages will be encoded as strings, so parse as JSON before using.

Additional Inputs

When the send_input message is received, update the inputs of your handler however you like by using the set_input method of the Stream object.

A common pattern is to use a POST request to send the updated data. The first argument to the set_input method is the webrtc_id of the handler.

from pydantic import BaseModel, Field

class InputData(BaseModel):
    webrtc_id: str
    conf_threshold: float = Field(ge=0, le=1)


@app.post("/input_hook")
async def _(data: InputData):
    stream.set_input(data.webrtc_id, data.conf_threshold)

The updated data will be passed to the handler on the next call.

Additional Outputs

The fetch_output message is sent to the client whenever an instance of AdditionalOutputs is available. You can access the latest output data by calling the fetch_latest_output method of the Stream object.

However, rather than fetching each output manually, a common pattern is to fetch the entire stream of output data by calling the output_stream method.

Here is an example:

from fastapi.responses import StreamingResponse

@app.get("/updates")
async def stream_updates(webrtc_id: str):
    async def output_stream():
        async for output in stream.output_stream(webrtc_id):
            # Output is the AdditionalOutputs instance
            # Be sure to serialize it however you would like
            yield f"data: {output.args[0]}\n\n"

    return StreamingResponse(
        output_stream(), 
        media_type="text/event-stream"
    )

Handling Errors

When connecting via WebRTC, the server will respond to the /webrtc/offer route with a JSON response. If there are too many connections, the server will respond with a 429 error.

{
    "status": "failed",
    "meta": {
        "error": "concurrency_limit_reached",
        "limit": 10
    }

Over WebSocket, the server will send the same message before closing the connection.