Audio Streaming
Reply On Pause
Typically, you want to run a python function whenever a user has stopped speaking. This can be done by wrapping a python generator with the ReplyOnPause
class and passing it to the handler
argument of the Stream
object. The ReplyOnPause
class will handle the voice detection and turn taking logic automatically!
from fastrtc import ReplyOnPause, Stream
def response(audio: tuple[int, np.ndarray]): # (1)
sample_rate, audio_array = audio
# Generate response
for audio_chunk in generate_response(sample_rate, audio_array):
yield (sample_rate, audio_chunk) # (2)
stream = Stream(
handler=ReplyOnPause(response),
modality="audio",
mode="send-receive"
)
-
The python generator will receive the entire audio up until the user stopped. It will be a tuple of the form (sampling_rate, numpy array of audio). The array will have a shape of (1, num_samples). You can also pass in additional input components.
-
The generator must yield audio chunks as a tuple of (sampling_rate, numpy audio array). Each numpy audio array must have a shape of (1, num_samples).
-
The python generator will receive the entire audio up until the user stopped. It will be a tuple of the form (sampling_rate, numpy array of audio). The array will have a shape of (1, num_samples). You can also pass in additional input components.
-
The generator must yield audio chunks as a tuple of (sampling_rate, numpy audio array). Each numpy audio array must have a shape of (1, num_samples).
Asynchronous
You can also use an async generator with ReplyOnPause
.
Parameters
You can customize the voice detection parameters by passing in algo_options
and model_options
to the ReplyOnPause
class.
from fastrtc import AlgoOptions, SileroVadOptions
stream = Stream(
handler=ReplyOnPause(
response,
algo_options=AlgoOptions(
audio_chunk_duration=0.6,
started_talking_threshold=0.2,
speech_threshold=0.1
),
model_options=SileroVadOptions(
threshold=0.5,
min_speech_duration_ms=250,
min_silence_duration_ms=100
)
)
)
Interruptions
By default, the ReplyOnPause
handler will allow you to interrupt the response at any time by speaking again. If you do not want to allow interruption, you can set the can_interrupt
parameter to False
.
from fastrtc import Stream, ReplyOnPause
stream = Stream(
handler=ReplyOnPause(
response,
can_interrupt=True,
)
)
Muting Response Audio
You can directly talk over the output audio and the interruption will still work. However, in these cases, the audio transcription may be incorrect. To prevent this, it's best practice to mute the output audio before talking over it.
Startup Function
You can pass in a startup_fn
to the ReplyOnPause
class. This function will be called when the connection is first established. It is helpful for generating intial responses.
from fastrtc import get_tts_model, Stream, ReplyOnPause
tts_client = get_tts_model()
def echo(audio: tuple[int, np.ndarray]):
# Implement any iterator that yields audio
# See "LLM Voice Chat" for a more complete example
yield audio
def startup():
for chunk in tts_client.stream_tts_sync("Welcome to the echo audio demo!"):
yield chunk
stream = Stream(
handler=ReplyOnPause(echo, startup_fn=startup),
modality="audio",
mode="send-receive",
ui_args={"title": "Echo Audio"},
)
Reply On Stopwords
You can configure your AI model to run whenever a set of "stop words" are detected, like "Hey Siri" or "computer", with the ReplyOnStopWords
class.
The API is similar to ReplyOnPause
with the addition of a stop_words
parameter.
from fastrtc import Stream, ReplyOnStopWords
def response(audio: tuple[int, np.ndarray]):
"""This function must yield audio frames"""
...
for numpy_array in generated_audio:
yield (sampling_rate, numpy_array, "mono")
stream = Stream(
handler=ReplyOnStopWords(generate,
input_sample_rate=16000,
stop_words=["computer"]), # (1)
modality="audio",
mode="send-receive"
)
- The
stop_words
can be single words or pairs of words. Be sure to include common misspellings of your word for more robust detection, e.g. "llama", "lamma". In my experience, it's best to use two very distinct words like "ok computer" or "hello iris".
- The
stop_words
can be single words or pairs of words. Be sure to include common misspellings of your word for more robust detection, e.g. "llama", "lamma". In my experience, it's best to use two very distinct words like "ok computer" or "hello iris".
Extra Dependencies
The ReplyOnStopWords
class requires the the stopword
extra. Run pip install fastrtc[stopword]
to install it.
English Only
The ReplyOnStopWords
class is currently only supported for English.
Stream Handler
ReplyOnPause
and ReplyOnStopWords
are implementations of a StreamHandler
. The StreamHandler
is a low-level abstraction that gives you arbitrary control over how the input audio stream and output audio stream are created. The following example echos back the user audio.
import gradio as gr
from fastrtc import StreamHandler
from queue import Queue
class EchoHandler(StreamHandler):
def __init__(self) -> None:
super().__init__()
self.queue = Queue()
def receive(self, frame: tuple[int, np.ndarray]) -> None: # (1)
self.queue.put(frame)
def emit(self) -> None: # (2)
return self.queue.get()
def copy(self) -> StreamHandler:
return EchoHandler()
def shutdown(self) -> None: # (3)
pass
def start_up(self) -> None: # (4)
pass
stream = Stream(
handler=EchoHandler(),
modality="audio",
mode="send-receive"
)
- The
StreamHandler
class implements three methods:receive
,emit
andcopy
. Thereceive
method is called when a new frame is received from the client, and theemit
method returns the next frame to send to the client. Thecopy
method is called at the beginning of the stream to ensure each user has a unique stream handler. - The
emit
method SHOULD NOT block. If a frame is not ready to be sent, the method should returnNone
. If you need to wait for a frame, usewait_for_item
from theutils
module. - The
shutdown
method is called when the stream is closed. It should be used to clean up any resources. - The
start_up
method is called when the stream is first created. It should be used to initialize any resources. See Talk To OpenAI or Talk To Gemini for an example of aStreamHandler
that uses thestart_up
method to connect to an API.
- The
StreamHandler
class implements three methods:receive
,emit
andcopy
. Thereceive
method is called when a new frame is received from the client, and theemit
method returns the next frame to send to the client. Thecopy
method is called at the beginning of the stream to ensure each user has a unique stream handler. - The
emit
method SHOULD NOT block. If a frame is not ready to be sent, the method should returnNone
. If you need to wait for a frame, usewait_for_item
from theutils
module. - The
shutdown
method is called when the stream is closed. It should be used to clean up any resources. - The
start_up
method is called when the stream is first created. It should be used to initialize any resources. See Talk To OpenAI or Talk To Gemini for an example of aStreamHandler
that uses thestart_up
method to connect to an API.
Tip
See this Talk To Gemini for a complete example of a more complex stream handler.
Warning
The emit
method should not block. If you need to wait for a frame, use wait_for_item
from the utils
module.
Async Stream Handlers
It is also possible to create asynchronous stream handlers. This is very convenient for accessing async APIs from major LLM developers, like Google and OpenAI. The main difference is that receive
, emit
, and start_up
are now defined with async def
.
Here is aa simple example of using AsyncStreamHandler
:
from fastrtc import AsyncStreamHandler, wait_for_item, Stream
import asyncio
import numpy as np
class AsyncEchoHandler(AsyncStreamHandler):
"""Simple Async Echo Handler"""
def __init__(self) -> None:
super().__init__(input_sample_rate=24000)
self.queue = asyncio.Queue()
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
await self.queue.put(frame)
async def emit(self) -> None:
return await wait_for_item(self.queue)
def copy(self):
return AsyncEchoHandler()
async def shutdown(self):
pass
async def start_up(self) -> None:
pass
Tip
See Talk To Gemini, Talk To Openai for complete examples of AsyncStreamHandler
s.
Text To Speech
You can use an on-device text to speech model if you have the tts
extra installed.
Import the get_tts_model
function and call it with the model name you want to use.
At the moment, the only model supported is kokoro
.
The get_tts_model
function returns an object with three methods:
tts
: Synchronous text to speech.stream_tts_sync
: Synchronous text to speech streaming.stream_tts
: Asynchronous text to speech streaming.
from fastrtc import get_tts_model
model = get_tts_model(model="kokoro")
for audio in model.stream_tts_sync("Hello, world!"):
yield audio
async for audio in model.stream_tts("Hello, world!"):
yield audio
audio = model.tts("Hello, world!")
Tip
You can customize the audio by passing in an instace of KokoroTTSOptions
to the method.
See here for a list of available voices.
Speech To Text
You can use an on-device speech to text model if you have the stt
or stopword
extra installed.
Import the get_stt_model
function and call it with the model name you want to use.
At the moment, the only models supported are moonshine/base
and moonshine/tiny
.
The get_stt_model
function returns an object with the following method:
stt
: Synchronous speech to text.
from fastrtc import get_stt_model
model = get_stt_model(model="moonshine/base")
audio = (16000, np.random.randint(-32768, 32768, size=(1, 16000)))
text = model.stt(audio)
Example
See LLM Voice Chat for an example of using the stt
method in a ReplyOnPause
handler.
English Only
The stt
model is currently only supported for English.
Requesting Inputs
In ReplyOnPause
and ReplyOnStopWords
, any additional input data is automatically passed to your generator. For StreamHandler
s, you must manually request the input data from the client.
You can do this by calling await self.wait_for_args()
(for AsyncStreamHandler
s) in either the emit
or receive
methods. For a StreamHandler
, you can call self.wait_for_args_sync()
.
We can access the value of this component via the latest_args
property of the StreamHandler
. The latest_args
is a list storing each of the values. The 0th index is the dummy string __webrtc_value__
.
Considerations for Telephone Use
In order for your handler to work over the phone, you must make sure that your handler is not expecting any additional input data besides the audio.
If you call await self.wait_for_args()
your stream will wait forever for the additional input data.
The stream handlers have a phone_mode
property that is set to True
if the stream is running over the phone. You can use this property to determine if you should wait for additional input data.
ReplyOnPause
and telephone use
The generator you pass to ReplyOnPause
must have default arguments for all arguments except audio.
If you yield AdditionalOutputs
, they will be passed in as the input arguments to the generator the next time it is called.
Tip
See Talk To Claude for an example of a ReplyOnPause
handler that is compatible with telephone usage. Notice how the input chatbot history is yielded as an AdditionalOutput
on each invocation.
Telephone Integration
You can integrate a Stream
with a SIP provider like Twilio to set up your own phone number for your application.
Setup Process
-
Create a Twilio Account: Sign up for a Twilio account and purchase a phone number with voice capabilities. With a trial account, only the phone number you used during registration will be able to connect to your
Stream
. -
Mount Your Stream: Add your
Stream
to a FastAPI app usingstream.mount(app)
and run the server. -
Configure Twilio Webhook: Point your Twilio phone number to your webhook URL.
Configuring Twilio
To configure your Twilio phone number:
- In your Twilio dashboard, navigate to
Manage
→TwiML Apps
in the left sidebar - Click
Create TwiML App
- Set the
Voice URL
to your FastAPI app's URL with/telephone/incoming
appended (e.g.,https://your-app-url.com/telephone/incoming
)
Local Development with Ngrok
For local development, use ngrok to expose your local server:
Then set your Twilio Voice URL tohttps://your-ngrok-subdomain.ngrok.io/telephone/incoming-call
Code Example
Here's a simple example of setting up a Twilio endpoint: