use serde::Serialize; use std::io::Read; use std::process::{Command, Stdio}; use std::ffi::OsString; use std::sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, mpsc, Arc, Mutex, }; use std::time::Duration; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use ringbuf::HeapRb; #[cfg(windows)] use std::os::windows::process::CommandExt; #[cfg(windows)] const CREATE_NO_WINDOW: u32 = 0x08000000; fn command_hidden(program: &OsString) -> Command { let mut cmd = Command::new(program); #[cfg(windows)] { cmd.creation_flags(CREATE_NO_WINDOW); } cmd } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum PlayerStatus { Idle, Buffering, Playing, Stopped, Error, } #[derive(Debug, Clone, Serialize)] pub struct PlayerState { pub status: PlayerStatus, pub url: Option, pub volume: f32, pub error: Option, } impl Default for PlayerState { fn default() -> Self { Self { status: PlayerStatus::Idle, url: None, volume: 0.5, error: None, } } } pub struct PlayerShared { pub state: Mutex, } impl PlayerShared { pub fn snapshot(&self) -> PlayerState { self.state.lock().unwrap().clone() } } #[derive(Debug)] pub enum PlayerCommand { Play { url: String }, // Cast-only playback: decode to PCM and keep it available for cast taps, // but do not open a CPAL output stream. PlayCast { url: String }, Stop, SetVolume { volume: f32 }, CastTapStart { port: u16, reply: mpsc::Sender>, }, CastTapStop, Shutdown, } #[derive(Clone)] pub struct PlayerController { pub tx: mpsc::Sender, } pub fn spawn_player_thread(shared: std::sync::Arc) -> PlayerController { let (tx, rx) = mpsc::channel::(); let shared_for_thread = std::sync::Arc::clone(&shared); std::thread::spawn(move || player_thread(shared_for_thread, rx)); PlayerController { tx } } fn clamp01(v: f32) -> f32 { if v.is_nan() { 0.0 } else if v < 0.0 { 0.0 } else if v > 1.0 { 1.0 } else { v } } fn volume_to_bits(v: f32) -> u32 { clamp01(v).to_bits() } fn volume_from_bits(bits: u32) -> f32 { f32::from_bits(bits) } fn set_status(shared: &std::sync::Arc, status: PlayerStatus) { let mut s = shared.state.lock().unwrap(); if s.status != status { s.status = status; } } fn set_error(shared: &std::sync::Arc, message: String) { let mut s = shared.state.lock().unwrap(); s.status = PlayerStatus::Error; s.error = Some(message); } pub(crate) fn ffmpeg_command() -> OsString { // Step 2: external ffmpeg binary. // Lookup order: // 1) RADIOPLAYER_FFMPEG (absolute or relative) // 2) ffmpeg next to the application executable // 3) PATH lookup (ffmpeg / ffmpeg.exe) if let Ok(p) = std::env::var("RADIOPLAYER_FFMPEG") { if !p.trim().is_empty() { return OsString::from(p); } } let local_name = if cfg!(windows) { "ffmpeg.exe" } else { "ffmpeg" }; if let Ok(exe) = std::env::current_exe() { if let Some(dir) = exe.parent() { // Common locations depending on bundler/platform. let candidates = [ dir.join(local_name), // Some packagers place resources in a sibling folder. dir.join("resources").join(local_name), dir.join("Resources").join(local_name), // Or one level above. dir.join("..").join("resources").join(local_name), dir.join("..").join("Resources").join(local_name), ]; for candidate in candidates { if candidate.exists() { return candidate.into_os_string(); } } } } OsString::from(local_name) } pub fn preflight_ffmpeg_only() -> Result<(), String> { let ffmpeg = ffmpeg_command(); let status = command_hidden(&ffmpeg) .arg("-version") .stdout(Stdio::null()) .stderr(Stdio::null()) .status() .map_err(|e| { let ffmpeg_disp = ffmpeg.to_string_lossy(); format!( "FFmpeg not available ({ffmpeg_disp}): {e}. Set RADIOPLAYER_FFMPEG, bundle ffmpeg next to the app, or install ffmpeg on PATH." ) })?; if !status.success() { return Err("FFmpeg exists but returned non-zero for -version".to_string()); } Ok(()) } pub fn preflight_check() -> Result<(), String> { // Ensure we have an output device up-front so UI gets a synchronous error. let host = cpal::default_host(); let device = host .default_output_device() .ok_or_else(|| "No default audio output device".to_string())?; let _ = device .default_output_config() .map_err(|e| format!("Failed to get output config: {e}"))?; preflight_ffmpeg_only()?; Ok(()) } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum PipelineMode { WithOutput, Headless, } struct CastTapProc { child: std::process::Child, writer_join: Option>, } struct Pipeline { stop_flag: Arc, volume_bits: Arc, _stream: Option, decoder_join: Option>, cast_tx: Arc>>>>, cast_proc: Option, sample_rate: u32, channels: u16, } impl Pipeline { fn start(shared: std::sync::Arc, url: String, mode: PipelineMode) -> Result { let (device, sample_format, cfg, sample_rate, channels) = match mode { PipelineMode::WithOutput => { let host = cpal::default_host(); let device = host .default_output_device() .ok_or_else(|| "No default audio output device".to_string())?; let default_cfg = device .default_output_config() .map_err(|e| format!("Failed to get output config: {e}"))?; let sample_format = default_cfg.sample_format(); let cfg = default_cfg.config(); let sample_rate = cfg.sample_rate.0; let channels = cfg.channels as u16; (Some(device), Some(sample_format), Some(cfg), sample_rate, channels) } PipelineMode::Headless => { // For cast-only, pick a sane, widely-supported PCM format. // This does not depend on an audio device. (None, None, None, 48_000u32, 2u16) } }; // 5 seconds of PCM buffering (i16 samples) let (mut prod_opt, mut cons_opt) = if mode == PipelineMode::WithOutput { let cfg = cfg.as_ref().expect("cfg must exist for WithOutput"); let capacity_samples = (sample_rate as usize) .saturating_mul(cfg.channels as usize) .saturating_mul(5); let rb = HeapRb::::new(capacity_samples); let (prod, cons) = rb.split(); (Some(prod), Some(cons)) } else { (None, None) }; let stop_flag = Arc::new(AtomicBool::new(false)); let volume_bits = Arc::new(AtomicU32::new({ let s = shared.state.lock().unwrap(); volume_to_bits(s.volume) })); let cast_tx: Arc>>>> = Arc::new(Mutex::new(None)); // Decoder thread: spawns ffmpeg, reads PCM, writes into ring buffer. let stop_for_decoder = Arc::clone(&stop_flag); let shared_for_decoder = std::sync::Arc::clone(&shared); let decoder_url = url.clone(); let cast_tx_for_decoder = Arc::clone(&cast_tx); let decoder_join = std::thread::spawn(move || { let mut backoff_ms: u64 = 250; let mut pushed_since_start: usize = 0; let playing_threshold_samples = (sample_rate as usize) .saturating_mul(channels as usize) .saturating_div(4); // ~250ms 'outer: loop { if stop_for_decoder.load(Ordering::SeqCst) { break; } set_status(&shared_for_decoder, PlayerStatus::Buffering); let ffmpeg = ffmpeg_command(); let ffmpeg_disp = ffmpeg.to_string_lossy(); let mut child = match command_hidden(&ffmpeg) .arg("-nostdin") .arg("-hide_banner") .arg("-loglevel") .arg("warning") // basic reconnect flags (best-effort; not all protocols honor these) .arg("-reconnect") .arg("1") .arg("-reconnect_streamed") .arg("1") .arg("-reconnect_delay_max") .arg("5") .arg("-i") .arg(&decoder_url) .arg("-vn") .arg("-ac") .arg(channels.to_string()) .arg("-ar") .arg(sample_rate.to_string()) .arg("-f") .arg("s16le") .arg("pipe:1") .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn() { Ok(c) => c, Err(e) => { // If ffmpeg isn't available, this is a hard failure. set_error( &shared_for_decoder, format!( "Failed to start ffmpeg ({ffmpeg_disp}): {e}. Set RADIOPLAYER_FFMPEG, bundle ffmpeg next to the app, or install ffmpeg on PATH." ), ); break; } }; let mut stdout = match child.stdout.take() { Some(s) => s, None => { set_error(&shared_for_decoder, "ffmpeg stdout not available".to_string()); let _ = child.kill(); break; } }; let mut buf = [0u8; 8192]; let mut leftover: Option = None; loop { if stop_for_decoder.load(Ordering::SeqCst) { let _ = child.kill(); let _ = child.wait(); break 'outer; } let n = match stdout.read(&mut buf) { Ok(0) => 0, Ok(n) => n, Err(_) => 0, }; if n == 0 { // EOF / disconnect. Try to reconnect after backoff. let _ = child.kill(); let _ = child.wait(); if stop_for_decoder.load(Ordering::SeqCst) { break 'outer; } set_status(&shared_for_decoder, PlayerStatus::Buffering); std::thread::sleep(Duration::from_millis(backoff_ms)); backoff_ms = (backoff_ms * 2).min(5000); continue 'outer; } backoff_ms = 250; // Forward raw PCM bytes to cast tap (if enabled). if let Some(tx) = cast_tx_for_decoder.lock().unwrap().as_ref() { // Best-effort: never block local playback. let _ = tx.try_send(buf[..n].to_vec()); } // Convert bytes to i16 LE samples let mut i = 0usize; if let Some(b0) = leftover.take() { if n >= 1 { let b1 = buf[0]; let sample = i16::from_le_bytes([b0, b1]); if let Some(prod) = prod_opt.as_mut() { let _ = prod.push(sample); } pushed_since_start += 1; i = 1; } else { leftover = Some(b0); } } while i + 1 < n { let sample = i16::from_le_bytes([buf[i], buf[i + 1]]); if let Some(prod) = prod_opt.as_mut() { let _ = prod.push(sample); } pushed_since_start += 1; i += 2; } if i < n { leftover = Some(buf[i]); } // Move to Playing once we've decoded a small buffer. if pushed_since_start >= playing_threshold_samples { set_status(&shared_for_decoder, PlayerStatus::Playing); } } } }); let stream = if mode == PipelineMode::WithOutput { let device = device.expect("device must exist for WithOutput"); let sample_format = sample_format.expect("sample_format must exist for WithOutput"); let cfg = cfg.expect("cfg must exist for WithOutput"); let mut cons = cons_opt.take().expect("cons must exist for WithOutput"); // Audio callback: drain ring buffer and write to output. let shared_for_cb = std::sync::Arc::clone(&shared); let shared_for_cb_err = std::sync::Arc::clone(&shared_for_cb); let stop_for_cb = Arc::clone(&stop_flag); let volume_for_cb = Arc::clone(&volume_bits); let mut last_was_underrun = false; let err_fn = move |err| { let msg = format!("Audio output error: {err}"); set_error(&shared_for_cb_err, msg); }; let built = match sample_format { cpal::SampleFormat::F32 => device.build_output_stream( &cfg, move |data: &mut [f32], _| { if stop_for_cb.load(Ordering::Relaxed) { for s in data.iter_mut() { *s = 0.0; } return; } let vol = volume_from_bits(volume_for_cb.load(Ordering::Relaxed)); let mut underrun = false; for s in data.iter_mut() { if let Some(v) = cons.pop() { *s = (v as f32 / 32768.0) * vol; } else { *s = 0.0; underrun = true; } } if underrun != last_was_underrun { last_was_underrun = underrun; set_status( &shared_for_cb, if underrun { PlayerStatus::Buffering } else { PlayerStatus::Playing }, ); } }, err_fn, None, ), cpal::SampleFormat::I16 => device.build_output_stream( &cfg, move |data: &mut [i16], _| { if stop_for_cb.load(Ordering::Relaxed) { for s in data.iter_mut() { *s = 0; } return; } let vol = volume_from_bits(volume_for_cb.load(Ordering::Relaxed)); let mut underrun = false; for s in data.iter_mut() { if let Some(v) = cons.pop() { let scaled = (v as f32 * vol).clamp(i16::MIN as f32, i16::MAX as f32); *s = scaled as i16; } else { *s = 0; underrun = true; } } if underrun != last_was_underrun { last_was_underrun = underrun; set_status( &shared_for_cb, if underrun { PlayerStatus::Buffering } else { PlayerStatus::Playing }, ); } }, err_fn, None, ), cpal::SampleFormat::U16 => device.build_output_stream( &cfg, move |data: &mut [u16], _| { if stop_for_cb.load(Ordering::Relaxed) { for s in data.iter_mut() { *s = 0; } return; } let vol = volume_from_bits(volume_for_cb.load(Ordering::Relaxed)); let mut underrun = false; for s in data.iter_mut() { if let Some(v) = cons.pop() { // Convert signed i16 to unsigned with bias. let f = (v as f32 / 32768.0) * vol; let scaled = (f * 32767.0 + 32768.0).clamp(0.0, 65535.0); *s = scaled as u16; } else { *s = 0; underrun = true; } } if underrun != last_was_underrun { last_was_underrun = underrun; set_status( &shared_for_cb, if underrun { PlayerStatus::Buffering } else { PlayerStatus::Playing }, ); } }, err_fn, None, ), _ => return Err("Unsupported output sample format".to_string()), } .map_err(|e| format!("Failed to create output stream: {e}"))?; built .play() .map_err(|e| format!("Failed to start output stream: {e}"))?; Some(built) } else { None }; Ok(Self { stop_flag, volume_bits, _stream: stream, decoder_join: Some(decoder_join), cast_tx, cast_proc: None, sample_rate, channels, }) } fn start_cast_tap(&mut self, port: u16, sample_rate: u32, channels: u16) -> Result<(), String> { // Stop existing tap first. self.stop_cast_tap(); let ffmpeg = ffmpeg_command(); let ffmpeg_disp = ffmpeg.to_string_lossy(); let spawn = |codec: &str| -> Result { command_hidden(&ffmpeg) .arg("-nostdin") .arg("-hide_banner") .arg("-loglevel") .arg("warning") .arg("-f") .arg("s16le") .arg("-ac") .arg(channels.to_string()) .arg("-ar") .arg(sample_rate.to_string()) .arg("-i") .arg("pipe:0") .arg("-vn") .arg("-c:a") .arg(codec) .arg("-b:a") .arg("128k") .arg("-f") .arg("mp3") .arg("-content_type") .arg("audio/mpeg") .arg("-listen") .arg("1") .arg(format!("http://0.0.0.0:{port}/stream.mp3")) .stdin(Stdio::piped()) .stdout(Stdio::null()) .stderr(Stdio::piped()) .spawn() .map_err(|e| { format!( "Failed to start ffmpeg cast tap ({ffmpeg_disp}): {e}. Set RADIOPLAYER_FFMPEG, bundle ffmpeg next to the app, or install ffmpeg on PATH." ) }) }; let mut child = spawn("libmp3lame")?; std::thread::sleep(Duration::from_millis(150)); if let Ok(Some(status)) = child.try_wait() { if !status.success() { // Some builds lack libmp3lame; fall back to built-in encoder. child = spawn("mp3")?; } } let stdin = child .stdin .take() .ok_or_else(|| "ffmpeg cast tap stdin not available".to_string())?; let (tx, rx) = mpsc::sync_channel::>(256); *self.cast_tx.lock().unwrap() = Some(tx); let writer_join = std::thread::spawn(move || { use std::io::Write; let mut stdin = stdin; while let Ok(chunk) = rx.recv() { if chunk.is_empty() { continue; } if stdin.write_all(&chunk).is_err() { break; } } let _ = stdin.flush(); }); self.cast_proc = Some(CastTapProc { child, writer_join: Some(writer_join), }); Ok(()) } fn stop_cast_tap(&mut self) { *self.cast_tx.lock().unwrap() = None; if let Some(mut proc) = self.cast_proc.take() { let _ = proc.child.kill(); let _ = proc.child.wait(); if let Some(j) = proc.writer_join.take() { let _ = j.join(); } } } fn stop(mut self, shared: &std::sync::Arc) { self.stop_flag.store(true, Ordering::SeqCst); self.stop_cast_tap(); // dropping stream stops audio if let Some(j) = self.decoder_join.take() { let _ = j.join(); } set_status(&shared, PlayerStatus::Stopped); } fn set_volume(&self, volume: f32) { self.volume_bits.store(volume_to_bits(volume), Ordering::Relaxed); } } fn player_thread(shared: std::sync::Arc, rx: mpsc::Receiver) { // Step 2: FFmpeg decode + CPAL playback. let mut pipeline: Option = None; let mut pipeline_cast_owned = false; while let Ok(cmd) = rx.recv() { match cmd { PlayerCommand::Play { url } => { if let Some(p) = pipeline.take() { p.stop(&shared); } pipeline_cast_owned = false; { let mut s = shared.state.lock().unwrap(); s.error = None; s.url = Some(url.clone()); s.status = PlayerStatus::Buffering; } match Pipeline::start(std::sync::Arc::clone(&shared), url, PipelineMode::WithOutput) { Ok(p) => { // Apply current volume to pipeline atomics. let vol = { shared.state.lock().unwrap().volume }; p.set_volume(vol); pipeline = Some(p); } Err(e) => { set_error(&shared, e); pipeline = None; } } } PlayerCommand::PlayCast { url } => { if let Some(p) = pipeline.take() { p.stop(&shared); } pipeline_cast_owned = true; { let mut s = shared.state.lock().unwrap(); s.error = None; s.url = Some(url.clone()); s.status = PlayerStatus::Buffering; } match Pipeline::start(std::sync::Arc::clone(&shared), url, PipelineMode::Headless) { Ok(p) => { let vol = { shared.state.lock().unwrap().volume }; p.set_volume(vol); pipeline = Some(p); } Err(e) => { set_error(&shared, e); pipeline = None; } } } PlayerCommand::Stop => { if let Some(p) = pipeline.take() { p.stop(&shared); } else { let mut s = shared.state.lock().unwrap(); s.status = PlayerStatus::Stopped; s.error = None; } pipeline_cast_owned = false; } PlayerCommand::SetVolume { volume } => { let v = clamp01(volume); { let mut s = shared.state.lock().unwrap(); s.volume = v; } if let Some(p) = pipeline.as_ref() { p.set_volume(v); } } PlayerCommand::CastTapStart { port, reply } => { if let Some(p) = pipeline.as_mut() { // Current pipeline sample format is always s16le. let res = p.start_cast_tap(port, p.sample_rate, p.channels); let _ = reply.send(res); } else { let _ = reply.send(Err("No active decoder pipeline".to_string())); } } PlayerCommand::CastTapStop => { if let Some(p) = pipeline.as_mut() { p.stop_cast_tap(); } if pipeline_cast_owned { if let Some(p) = pipeline.take() { p.stop(&shared); } pipeline_cast_owned = false; } } PlayerCommand::Shutdown => break, } } if let Some(p) = pipeline.take() { p.stop(&shared); } else { set_status(&shared, PlayerStatus::Stopped); } }