You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

266 lines
7.9 KiB

4 years ago
//! The client to the obs-websocket API and main entry point.
use std::{
sync::{atomic::AtomicU64, Arc},
use anyhow::{bail, Result};
use futures_util::{
stream::{SplitSink, Stream, StreamExt},
use log::{debug, error, trace};
use serde::de::DeserializeOwned;
use tokio::{
sync::{broadcast, oneshot, Mutex},
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
use crate::{
requests::{Request, RequestType},
responses::{AuthRequired, Response},
pub use self::{
general::General, outputs::Outputs, profiles::Profiles, recording::Recording,
replay_buffer::ReplayBuffer, scene_collections::SceneCollections, scene_items::SceneItems,
scenes::Scenes, sources::Sources, streaming::Streaming, studio_mode::StudioMode,
mod general;
mod outputs;
mod profiles;
mod recording;
mod replay_buffer;
mod scene_collections;
mod scene_items;
mod scenes;
mod sources;
mod streaming;
mod studio_mode;
mod transitions;
/// The client is the main entry point to access the obs-websocket API. It allows to call various
/// functions to remote control an OBS instance as well as to listen to events caused by the user
/// by interacting with OBS.
pub struct Client {
write: Mutex<MessageWriter>,
id_counter: AtomicU64,
receivers: Arc<Mutex<HashMap<String, oneshot::Sender<serde_json::Value>>>>,
event_sender: broadcast::Sender<Event>,
type MessageWriter = SplitSink<WebSocketStream<TcpStream>, Message>;
impl Client {
/// Connect to a obs-websocket instance on the given host and port.
pub async fn connect(host: impl AsRef<str>, port: u16) -> Result<Self> {
let (socket, _) =
tokio_tungstenite::connect_async(format!("ws://{}:{}", host.as_ref(), port)).await?;
let (write, mut read) = socket.split();
let receivers = Arc::new(Mutex::new(HashMap::<
let receivers2 = Arc::clone(&receivers);
let (event_sender, _) = broadcast::channel(100);
let events_tx = event_sender.clone();
tokio::spawn(async move {
while let Some(Ok(msg)) = {
trace!("{}", msg);
let temp: Result<()> = async {
let json = serde_json::from_str::<serde_json::Value>(&msg.into_text()?)?;
if let Some(message_id) = json
.and_then(|obj| obj.get("message-id"))
.and_then(|id| id.as_str())
debug!("got message with id {}", message_id);
if let Some(tx) = receivers2.lock().await.remove(message_id) {
} else {
let event = serde_json::from_value(json)?;
if let Err(e) = temp {
error!("{:?}", e);
let write = Mutex::new(write);
let id_counter = AtomicU64::new(1);
Ok(Self {
async fn send_message<T>(&self, req: RequestType) -> Result<T>
T: DeserializeOwned,
let id = self
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
let req = Request {
message_id: id.clone(),
ty: req,
let json = serde_json::to_string(&req)?;
let (tx, rx) = oneshot::channel();
self.receivers.lock().await.insert(id, tx);
debug!("sending message: {}", json);
let resp = rx.await?;
if let Some(error) = resp
.and_then(|o| o.get("error"))
.and_then(|e| e.as_str())
bail!("{}", error);
.map(|r| r.details)
/// Login to the OBS websocket if an authentication is required.
pub async fn login(&self, password: Option<impl AsRef<str>>) -> Result<()> {
let auth_required = self.general().get_auth_required().await?;
if let AuthRequired {
auth_required: true,
challenge: Some(challenge),
salt: Some(salt),
} = auth_required
match password {
Some(password) => {
let auth = Self::create_auth_response(&challenge, &salt, password.as_ref());
None => bail!("authentication required but no password provided"),
fn create_auth_response(challenge: &str, salt: &str, password: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
let mut auth = String::with_capacity(Sha256::output_size() * 4 / 3 + 4);
base64::encode_config_buf(hasher.finalize_reset(), base64::STANDARD, &mut auth);
base64::encode_config_buf(hasher.finalize(), base64::STANDARD, &mut auth);
/// Get a stream of events. Each call to this function creates a new listener, therefore it's
/// recommended to keep the stream around and iterate over it.
/// **Note**: To be able to iterate over the stream you have to pin it with
/// [`futures_util::pin_mut`] for example.
pub fn events(&self) -> impl Stream<Item = Event> {
let mut receiver = self.event_sender.subscribe();
async_stream::stream! {
while let Ok(event) = receiver.recv().await {
yield event;
/// Access general API functions.
pub fn general(&self) -> General<'_> {
General { client: self }
/// Access API functions related to sources.
pub fn sources(&self) -> Sources<'_> {
Sources { client: self }
/// Access API functions related to outputs.
pub fn outputs(&self) -> Outputs<'_> {
Outputs { client: self }
/// Access API functions related to profiles.
pub fn profiles(&self) -> Profiles<'_> {
Profiles { client: self }
/// Access API functions related to recording.
pub fn recording(&self) -> Recording<'_> {
Recording { client: self }
/// Access API functions related to the replay buffer.
pub fn replay_buffer(&self) -> ReplayBuffer<'_> {
ReplayBuffer { client: self }
/// Access API functions related to scene collections.
pub fn scene_collections(&self) -> SceneCollections<'_> {
SceneCollections { client: self }
/// Access API functions related to scene items.
pub fn scene_items(&self) -> SceneItems<'_> {
SceneItems { client: self }
/// Access API functions related to scenes.
pub fn scenes(&self) -> Scenes<'_> {
Scenes { client: self }
/// Access API functions related to streaming.
pub fn streaming(&self) -> Streaming<'_> {
Streaming { client: self }
/// Access API functions related to the studio mode.
pub fn studio_mode(&self) -> StudioMode<'_> {
StudioMode { client: self }
/// Access API functions related to transitions.
pub fn transitions(&self) -> Transitions<'_> {
Transitions { client: self }