Connecting via API
Before continuing, select the modality
, mode
of your Stream
and whether you're using WebRTC
or WebSocket
s.
Sample Code
Message Format
Over both WebRTC and WebSocket, the server can send messages of the following format:
{
"type": `send_input` | `fetch_output` | `stopword` | `error` | `warning` | `log`,
"data": string | object
}
send_input
: Send any input data for the handler to the server. SeeAdditional Inputs
for more details.fetch_output
: An instance ofAdditionalOutputs
is sent to the server.stopword
: The stopword has been detected. SeeReplyOnStopWords
for more details.error
: An error occurred. Thedata
will be a string containing the error message.warning
: A warning occurred. Thedata
will be a string containing the warning message.log
: A log message. Thedata
will be a string containing the log message.
The ReplyOnPause
handler can also send the following log
messages.
Tip
When using WebRTC, the messages will be encoded as strings, so parse as JSON before using.
Additional Inputs
When the send_input
message is received, update the inputs of your handler however you like by using the set_input
method of the Stream
object.
A common pattern is to use a POST
request to send the updated data. The first argument to the set_input
method is the webrtc_id
of the handler.
from pydantic import BaseModel, Field
class InputData(BaseModel):
webrtc_id: str
conf_threshold: float = Field(ge=0, le=1)
@app.post("/input_hook")
async def _(data: InputData):
stream.set_input(data.webrtc_id, data.conf_threshold)
The updated data will be passed to the handler on the next call.
Additional Outputs
The fetch_output
message is sent to the client whenever an instance of AdditionalOutputs
is available. You can access the latest output data by calling the fetch_latest_output
method of the Stream
object.
However, rather than fetching each output manually, a common pattern is to fetch the entire stream of output data by calling the output_stream
method.
Here is an example:
from fastapi.responses import StreamingResponse
@app.get("/updates")
async def stream_updates(webrtc_id: str):
async def output_stream():
async for output in stream.output_stream(webrtc_id):
# Output is the AdditionalOutputs instance
# Be sure to serialize it however you would like
yield f"data: {output.args[0]}\n\n"
return StreamingResponse(
output_stream(),
media_type="text/event-stream"
)
Handling Errors
When connecting via WebRTC
, the server will respond to the /webrtc/offer
route with a JSON response. If there are too many connections, the server will respond with a 429 error.
Over WebSocket
, the server will send the same message before closing the connection.