first step

This commit is contained in:
2026-01-11 10:30:54 +01:00
parent f9b9ce0994
commit 34c3f0dc89
7 changed files with 1105 additions and 30 deletions

509
src-tauri/src/player.rs Normal file
View File

@@ -0,0 +1,509 @@
use serde::Serialize;
use std::io::Read;
use std::process::{Command, Stdio};
use std::ffi::OsString;
use std::sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
mpsc, Arc, Mutex,
};
use std::time::Duration;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use ringbuf::HeapRb;
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum PlayerStatus {
Idle,
Buffering,
Playing,
Stopped,
Error,
}
#[derive(Debug, Clone, Serialize)]
pub struct PlayerState {
pub status: PlayerStatus,
pub url: Option<String>,
pub volume: f32,
pub error: Option<String>,
}
impl Default for PlayerState {
fn default() -> Self {
Self {
status: PlayerStatus::Idle,
url: None,
volume: 0.5,
error: None,
}
}
}
pub struct PlayerShared {
pub state: Mutex<PlayerState>,
}
impl PlayerShared {
pub fn snapshot(&self) -> PlayerState {
self.state.lock().unwrap().clone()
}
}
#[derive(Debug)]
pub enum PlayerCommand {
Play { url: String },
Stop,
SetVolume { volume: f32 },
Shutdown,
}
#[derive(Clone)]
pub struct PlayerController {
pub tx: mpsc::Sender<PlayerCommand>,
}
pub fn spawn_player_thread(shared: &'static PlayerShared) -> PlayerController {
let (tx, rx) = mpsc::channel::<PlayerCommand>();
std::thread::spawn(move || player_thread(shared, rx));
PlayerController { tx }
}
fn clamp01(v: f32) -> f32 {
if v.is_nan() {
0.0
} else if v < 0.0 {
0.0
} else if v > 1.0 {
1.0
} else {
v
}
}
fn volume_to_bits(v: f32) -> u32 {
clamp01(v).to_bits()
}
fn volume_from_bits(bits: u32) -> f32 {
f32::from_bits(bits)
}
fn set_status(shared: &'static PlayerShared, status: PlayerStatus) {
let mut s = shared.state.lock().unwrap();
if s.status != status {
s.status = status;
}
}
fn set_error(shared: &'static PlayerShared, message: String) {
let mut s = shared.state.lock().unwrap();
s.status = PlayerStatus::Error;
s.error = Some(message);
}
fn ffmpeg_command() -> OsString {
// Step 2: external ffmpeg binary.
// Lookup order:
// 1) RADIOPLAYER_FFMPEG (absolute or relative)
// 2) ffmpeg next to the application executable
// 3) PATH lookup (ffmpeg / ffmpeg.exe)
if let Ok(p) = std::env::var("RADIOPLAYER_FFMPEG") {
if !p.trim().is_empty() {
return OsString::from(p);
}
}
let local_name = if cfg!(windows) { "ffmpeg.exe" } else { "ffmpeg" };
if let Ok(exe) = std::env::current_exe() {
if let Some(dir) = exe.parent() {
let candidate = dir.join(local_name);
if candidate.exists() {
return candidate.into_os_string();
}
}
}
OsString::from(local_name)
}
struct Pipeline {
stop_flag: Arc<AtomicBool>,
volume_bits: Arc<AtomicU32>,
_stream: cpal::Stream,
decoder_join: Option<std::thread::JoinHandle<()>>,
}
impl Pipeline {
fn start(shared: &'static PlayerShared, url: String) -> Result<Self, String> {
let host = cpal::default_host();
let device = host
.default_output_device()
.ok_or_else(|| "No default audio output device".to_string())?;
let default_cfg = device
.default_output_config()
.map_err(|e| format!("Failed to get output config: {e}"))?;
let sample_format = default_cfg.sample_format();
let cfg = default_cfg.config();
let sample_rate = cfg.sample_rate.0;
let channels = cfg.channels as u16;
// 5 seconds of PCM buffering (i16 samples)
let capacity_samples = (sample_rate as usize)
.saturating_mul(cfg.channels as usize)
.saturating_mul(5);
let rb = HeapRb::<i16>::new(capacity_samples);
let (mut prod, mut cons) = rb.split();
let stop_flag = Arc::new(AtomicBool::new(false));
let volume_bits = Arc::new(AtomicU32::new({
let s = shared.state.lock().unwrap();
volume_to_bits(s.volume)
}));
// Decoder thread: spawns ffmpeg, reads PCM, writes into ring buffer.
let stop_for_decoder = Arc::clone(&stop_flag);
let shared_for_decoder = shared;
let decoder_url = url.clone();
let decoder_join = std::thread::spawn(move || {
let mut backoff_ms: u64 = 250;
let mut pushed_since_start: usize = 0;
let playing_threshold_samples = (sample_rate as usize)
.saturating_mul(cfg.channels as usize)
.saturating_div(4); // ~250ms
'outer: loop {
if stop_for_decoder.load(Ordering::SeqCst) {
break;
}
set_status(shared_for_decoder, PlayerStatus::Buffering);
let ffmpeg = ffmpeg_command();
let ffmpeg_disp = ffmpeg.to_string_lossy();
let mut child = match Command::new(&ffmpeg)
.arg("-nostdin")
.arg("-hide_banner")
.arg("-loglevel")
.arg("warning")
// basic reconnect flags (best-effort; not all protocols honor these)
.arg("-reconnect")
.arg("1")
.arg("-reconnect_streamed")
.arg("1")
.arg("-reconnect_delay_max")
.arg("5")
.arg("-i")
.arg(&decoder_url)
.arg("-vn")
.arg("-ac")
.arg(channels.to_string())
.arg("-ar")
.arg(sample_rate.to_string())
.arg("-f")
.arg("s16le")
.arg("pipe:1")
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
{
Ok(c) => c,
Err(e) => {
// If ffmpeg isn't available, this is a hard failure.
set_error(
shared_for_decoder,
format!(
"Failed to start ffmpeg ({ffmpeg_disp}): {e}. Set RADIOPLAYER_FFMPEG, bundle ffmpeg next to the app, or install ffmpeg on PATH."
),
);
break;
}
};
let mut stdout = match child.stdout.take() {
Some(s) => s,
None => {
set_error(shared_for_decoder, "ffmpeg stdout not available".to_string());
let _ = child.kill();
break;
}
};
let mut buf = [0u8; 8192];
let mut leftover: Option<u8> = None;
loop {
if stop_for_decoder.load(Ordering::SeqCst) {
let _ = child.kill();
let _ = child.wait();
break 'outer;
}
let n = match stdout.read(&mut buf) {
Ok(0) => 0,
Ok(n) => n,
Err(_) => 0,
};
if n == 0 {
// EOF / disconnect. Try to reconnect after backoff.
let _ = child.kill();
let _ = child.wait();
if stop_for_decoder.load(Ordering::SeqCst) {
break 'outer;
}
set_status(shared_for_decoder, PlayerStatus::Buffering);
std::thread::sleep(Duration::from_millis(backoff_ms));
backoff_ms = (backoff_ms * 2).min(5000);
continue 'outer;
}
backoff_ms = 250;
// Convert bytes to i16 LE samples
let mut i = 0usize;
if let Some(b0) = leftover.take() {
if n >= 1 {
let b1 = buf[0];
let sample = i16::from_le_bytes([b0, b1]);
let _ = prod.push(sample);
pushed_since_start += 1;
i = 1;
} else {
leftover = Some(b0);
}
}
while i + 1 < n {
let sample = i16::from_le_bytes([buf[i], buf[i + 1]]);
if prod.push(sample).is_ok() {
pushed_since_start += 1;
}
i += 2;
}
if i < n {
leftover = Some(buf[i]);
}
// Move to Playing once we've decoded a small buffer.
if pushed_since_start >= playing_threshold_samples {
set_status(shared_for_decoder, PlayerStatus::Playing);
}
}
}
});
// Audio callback: drain ring buffer and write to output.
let shared_for_cb = shared;
let stop_for_cb = Arc::clone(&stop_flag);
let volume_for_cb = Arc::clone(&volume_bits);
let mut last_was_underrun = false;
let err_fn = move |err| {
let msg = format!("Audio output error: {err}");
set_error(shared_for_cb, msg);
};
let stream = match sample_format {
cpal::SampleFormat::F32 => device.build_output_stream(
&cfg,
move |data: &mut [f32], _| {
if stop_for_cb.load(Ordering::Relaxed) {
for s in data.iter_mut() {
*s = 0.0;
}
return;
}
let vol = volume_from_bits(volume_for_cb.load(Ordering::Relaxed));
let mut underrun = false;
for s in data.iter_mut() {
if let Some(v) = cons.pop() {
*s = (v as f32 / 32768.0) * vol;
} else {
*s = 0.0;
underrun = true;
}
}
if underrun != last_was_underrun {
last_was_underrun = underrun;
set_status(
shared_for_cb,
if underrun {
PlayerStatus::Buffering
} else {
PlayerStatus::Playing
},
);
}
},
err_fn,
None,
),
cpal::SampleFormat::I16 => device.build_output_stream(
&cfg,
move |data: &mut [i16], _| {
if stop_for_cb.load(Ordering::Relaxed) {
for s in data.iter_mut() {
*s = 0;
}
return;
}
let vol = volume_from_bits(volume_for_cb.load(Ordering::Relaxed));
let mut underrun = false;
for s in data.iter_mut() {
if let Some(v) = cons.pop() {
let scaled = (v as f32 * vol).clamp(i16::MIN as f32, i16::MAX as f32);
*s = scaled as i16;
} else {
*s = 0;
underrun = true;
}
}
if underrun != last_was_underrun {
last_was_underrun = underrun;
set_status(
shared_for_cb,
if underrun {
PlayerStatus::Buffering
} else {
PlayerStatus::Playing
},
);
}
},
err_fn,
None,
),
cpal::SampleFormat::U16 => device.build_output_stream(
&cfg,
move |data: &mut [u16], _| {
if stop_for_cb.load(Ordering::Relaxed) {
for s in data.iter_mut() {
*s = 0;
}
return;
}
let vol = volume_from_bits(volume_for_cb.load(Ordering::Relaxed));
let mut underrun = false;
for s in data.iter_mut() {
if let Some(v) = cons.pop() {
// Convert signed i16 to unsigned with bias.
let f = (v as f32 / 32768.0) * vol;
let scaled = (f * 32767.0 + 32768.0).clamp(0.0, 65535.0);
*s = scaled as u16;
} else {
*s = 0;
underrun = true;
}
}
if underrun != last_was_underrun {
last_was_underrun = underrun;
set_status(
shared_for_cb,
if underrun {
PlayerStatus::Buffering
} else {
PlayerStatus::Playing
},
);
}
},
err_fn,
None,
),
_ => return Err("Unsupported output sample format".to_string()),
}
.map_err(|e| format!("Failed to create output stream: {e}"))?;
stream
.play()
.map_err(|e| format!("Failed to start output stream: {e}"))?;
Ok(Self {
stop_flag,
volume_bits,
_stream: stream,
decoder_join: Some(decoder_join),
})
}
fn stop(mut self, shared: &'static PlayerShared) {
self.stop_flag.store(true, Ordering::SeqCst);
// dropping stream stops audio
if let Some(j) = self.decoder_join.take() {
let _ = j.join();
}
set_status(shared, PlayerStatus::Stopped);
}
fn set_volume(&self, volume: f32) {
self.volume_bits.store(volume_to_bits(volume), Ordering::Relaxed);
}
}
fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver<PlayerCommand>) {
// Step 2: FFmpeg decode + CPAL playback.
let mut pipeline: Option<Pipeline> = None;
while let Ok(cmd) = rx.recv() {
match cmd {
PlayerCommand::Play { url } => {
if let Some(p) = pipeline.take() {
p.stop(shared);
}
{
let mut s = shared.state.lock().unwrap();
s.error = None;
s.url = Some(url.clone());
s.status = PlayerStatus::Buffering;
}
match Pipeline::start(shared, url) {
Ok(p) => {
// Apply current volume to pipeline atomics.
let vol = { shared.state.lock().unwrap().volume };
p.set_volume(vol);
pipeline = Some(p);
}
Err(e) => {
set_error(shared, e);
pipeline = None;
}
}
}
PlayerCommand::Stop => {
if let Some(p) = pipeline.take() {
p.stop(shared);
} else {
let mut s = shared.state.lock().unwrap();
s.status = PlayerStatus::Stopped;
s.error = None;
}
}
PlayerCommand::SetVolume { volume } => {
let v = clamp01(volume);
{
let mut s = shared.state.lock().unwrap();
s.volume = v;
}
if let Some(p) = pipeline.as_ref() {
p.set_volume(v);
}
}
PlayerCommand::Shutdown => break,
}
}
if let Some(p) = pipeline.take() {
p.stop(shared);
} else {
set_status(shared, PlayerStatus::Stopped);
}
}