fixed cast play
This commit is contained in:
@@ -2,10 +2,12 @@ use std::collections::HashMap;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, UdpSocket};
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use std::sync::{Mutex, Arc, RwLock};
|
||||
use std::thread;
|
||||
use std::sync::{Mutex, Arc};
|
||||
// thread usage replaced by async tasks; remove direct std::thread import
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{RwLock as TokioRwLock, mpsc};
|
||||
|
||||
#[cfg(not(feature = "use_agnostic_mdns"))]
|
||||
use mdns_sd::{ServiceDaemon, ServiceEvent};
|
||||
use serde_json::json;
|
||||
use tauri::{AppHandle, Manager, State};
|
||||
@@ -17,7 +19,7 @@ use tauri_plugin_shell::ShellExt;
|
||||
use reqwest;
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
|
||||
mod player;
|
||||
pub mod player;
|
||||
use player::{PlayerCommand, PlayerController, PlayerShared, PlayerState};
|
||||
|
||||
struct SidecarState {
|
||||
@@ -25,7 +27,7 @@ struct SidecarState {
|
||||
}
|
||||
|
||||
struct AppState {
|
||||
known_devices: Arc<RwLock<HashMap<String, DeviceInfo>>>,
|
||||
known_devices: Arc<TokioRwLock<HashMap<String, DeviceInfo>>>,
|
||||
}
|
||||
|
||||
struct DeviceInfo {
|
||||
@@ -87,15 +89,17 @@ fn local_ip_for_peer(peer_ip: IpAddr) -> Result<IpAddr, String> {
|
||||
Ok(sock.local_addr().map_err(|e| e.to_string())?.ip())
|
||||
}
|
||||
|
||||
fn wait_for_listen(ip: IpAddr, port: u16) {
|
||||
fn wait_for_listen(ip: IpAddr, port: u16) -> bool {
|
||||
// Best-effort: give ffmpeg a moment to bind before we tell the Chromecast.
|
||||
// Returns true if a listener accepted a connection during the wait window.
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
for _ in 0..50 {
|
||||
if TcpStream::connect_timeout(&addr, Duration::from_millis(30)).is_ok() {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
std::thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn stop_cast_proxy_locked(lock: &mut Option<CastProxy>) {
|
||||
@@ -172,7 +176,7 @@ async fn cast_proxy_start(
|
||||
player::preflight_ffmpeg_only()?;
|
||||
|
||||
let device_ip_str = {
|
||||
let devices = state.known_devices.read().unwrap();
|
||||
let devices = state.known_devices.read().await;
|
||||
devices
|
||||
.get(&device_name)
|
||||
.map(|d| d.ip.clone())
|
||||
@@ -183,14 +187,6 @@ async fn cast_proxy_start(
|
||||
.map_err(|_| format!("Invalid device IP: {device_ip_str}"))?;
|
||||
let local_ip = local_ip_for_peer(device_ip)?;
|
||||
|
||||
// Pick an ephemeral port.
|
||||
let listener = TcpListener::bind("0.0.0.0:0").map_err(|e| e.to_string())?;
|
||||
let port = listener.local_addr().map_err(|e| e.to_string())?.port();
|
||||
drop(listener);
|
||||
|
||||
let host = format_http_host(local_ip);
|
||||
let proxy_url = format!("http://{host}:{port}/stream.mp3");
|
||||
|
||||
// Stop any existing standalone proxy first.
|
||||
{
|
||||
let mut lock = proxy_state.inner.lock().unwrap();
|
||||
@@ -210,63 +206,82 @@ async fn cast_proxy_start(
|
||||
.map_err(|e| e.to_string())?;
|
||||
}
|
||||
|
||||
let (reply_tx, reply_rx) = std::sync::mpsc::channel();
|
||||
let _ = player
|
||||
.controller
|
||||
.tx
|
||||
.send(PlayerCommand::CastTapStart {
|
||||
port,
|
||||
reply: reply_tx,
|
||||
})
|
||||
.map_err(|e| e.to_string())?;
|
||||
// Try starting the TAP on several ephemeral ports before falling back.
|
||||
let host = format_http_host(local_ip);
|
||||
let max_attempts = 5usize;
|
||||
for attempt in 0..max_attempts {
|
||||
// Pick an ephemeral port.
|
||||
let listener = TcpListener::bind("0.0.0.0:0").map_err(|e| e.to_string())?;
|
||||
let port = listener.local_addr().map_err(|e| e.to_string())?.port();
|
||||
drop(listener);
|
||||
|
||||
match reply_rx.recv_timeout(Duration::from_secs(2)) {
|
||||
Ok(Ok(())) => {
|
||||
wait_for_listen(local_ip, port);
|
||||
Ok(CastProxyStartResult {
|
||||
url: proxy_url,
|
||||
mode: "tap".to_string(),
|
||||
let proxy_url = format!("http://{host}:{port}/stream.mp3");
|
||||
|
||||
let (reply_tx, reply_rx) = std::sync::mpsc::channel();
|
||||
let _ = player
|
||||
.controller
|
||||
.tx
|
||||
.send(PlayerCommand::CastTapStart {
|
||||
port,
|
||||
bind_host: host.clone(),
|
||||
reply: reply_tx,
|
||||
})
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!("Cast tap start failed; falling back to standalone proxy: {e}");
|
||||
let mut child = spawn_standalone_cast_proxy(url, port)?;
|
||||
if let Some(stderr) = child.stderr.take() {
|
||||
std::thread::spawn(move || {
|
||||
let reader = BufReader::new(stderr);
|
||||
for line in reader.lines().flatten() {
|
||||
warn!("[cast-proxy ffmpeg] {line}");
|
||||
}
|
||||
});
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
match reply_rx.recv_timeout(Duration::from_secs(2)) {
|
||||
Ok(Ok(())) => {
|
||||
if wait_for_listen(local_ip, port) {
|
||||
info!("Cast proxy started in TAP mode: {}", proxy_url);
|
||||
return Ok(CastProxyStartResult {
|
||||
url: proxy_url,
|
||||
mode: "tap".to_string(),
|
||||
});
|
||||
} else {
|
||||
warn!("Cast tap did not start listening on port {port}; attempt {}/{}", attempt+1, max_attempts);
|
||||
let _ = player.controller.tx.send(PlayerCommand::CastTapStop);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
wait_for_listen(local_ip, port);
|
||||
let mut lock = proxy_state.inner.lock().unwrap();
|
||||
*lock = Some(CastProxy { child });
|
||||
Ok(CastProxyStartResult {
|
||||
url: proxy_url,
|
||||
mode: "proxy".to_string(),
|
||||
})
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Cast tap start timed out; falling back to standalone proxy");
|
||||
let mut child = spawn_standalone_cast_proxy(url, port)?;
|
||||
if let Some(stderr) = child.stderr.take() {
|
||||
std::thread::spawn(move || {
|
||||
let reader = BufReader::new(stderr);
|
||||
for line in reader.lines().flatten() {
|
||||
warn!("[cast-proxy ffmpeg] {line}");
|
||||
}
|
||||
});
|
||||
Ok(Err(e)) => {
|
||||
warn!("Cast tap start failed on attempt {}/{}: {e}", attempt+1, max_attempts);
|
||||
let _ = player.controller.tx.send(PlayerCommand::CastTapStop);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Cast tap start timed out on attempt {}/{}", attempt+1, max_attempts);
|
||||
let _ = player.controller.tx.send(PlayerCommand::CastTapStop);
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
continue;
|
||||
}
|
||||
wait_for_listen(local_ip, port);
|
||||
let mut lock = proxy_state.inner.lock().unwrap();
|
||||
*lock = Some(CastProxy { child });
|
||||
Ok(CastProxyStartResult {
|
||||
url: proxy_url,
|
||||
mode: "proxy".to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// All TAP attempts failed; fall back to standalone proxy on a fresh ephemeral port.
|
||||
warn!("All TAP attempts failed; falling back to standalone proxy");
|
||||
let listener = TcpListener::bind("0.0.0.0:0").map_err(|e| e.to_string())?;
|
||||
let port = listener.local_addr().map_err(|e| e.to_string())?.port();
|
||||
drop(listener);
|
||||
let proxy_url = format!("http://{host}:{port}/stream.mp3");
|
||||
let mut child = spawn_standalone_cast_proxy(url, port)?;
|
||||
if let Some(stderr) = child.stderr.take() {
|
||||
std::thread::spawn(move || {
|
||||
let reader = BufReader::new(stderr);
|
||||
for line in reader.lines().flatten() {
|
||||
warn!("[cast-proxy ffmpeg] {line}");
|
||||
}
|
||||
});
|
||||
}
|
||||
// best-effort wait for standalone proxy
|
||||
let _ = wait_for_listen(local_ip, port);
|
||||
let mut lock = proxy_state.inner.lock().unwrap();
|
||||
*lock = Some(CastProxy { child });
|
||||
info!("Cast proxy started in STANDALONE mode (after TAP attempts): {}", proxy_url);
|
||||
Ok(CastProxyStartResult {
|
||||
url: proxy_url,
|
||||
mode: "proxy".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -346,7 +361,7 @@ async fn player_stop(player: State<'_, PlayerRuntime>) -> Result<(), String> {
|
||||
|
||||
#[tauri::command]
|
||||
async fn list_cast_devices(state: State<'_, AppState>) -> Result<Vec<String>, String> {
|
||||
let devices = state.known_devices.read().unwrap();
|
||||
let devices = state.known_devices.read().await;
|
||||
let mut list: Vec<String> = devices.keys().cloned().collect();
|
||||
list.sort();
|
||||
Ok(list)
|
||||
@@ -360,12 +375,18 @@ async fn cast_play(
|
||||
device_name: String,
|
||||
url: String,
|
||||
) -> Result<(), String> {
|
||||
// Resolve device name -> ip with diagnostics on failure
|
||||
let ip = {
|
||||
let devices = state.known_devices.read().unwrap();
|
||||
devices
|
||||
.get(&device_name)
|
||||
.map(|d| d.ip.clone())
|
||||
.ok_or("Device not found")?
|
||||
let devices = state.known_devices.read().await;
|
||||
if let Some(d) = devices.get(&device_name) {
|
||||
info!("cast_play: resolved device '{}' -> {}", device_name, d.ip);
|
||||
d.ip.clone()
|
||||
} else {
|
||||
// Log known device keys for debugging
|
||||
let keys: Vec<String> = devices.keys().cloned().collect();
|
||||
warn!("cast_play: device '{}' not found; known: {:?}", device_name, keys);
|
||||
return Err(format!("Device not found: {} (known: {:?})", device_name, keys));
|
||||
}
|
||||
};
|
||||
|
||||
let mut lock = sidecar_state.child.lock().unwrap();
|
||||
@@ -375,11 +396,25 @@ async fn cast_play(
|
||||
child
|
||||
} else {
|
||||
info!("Spawning new sidecar...");
|
||||
// Use the packaged sidecar binary (radiocast-sidecar-<target>.exe)
|
||||
let sidecar_command = app
|
||||
.shell()
|
||||
.sidecar("radiocast-sidecar")
|
||||
.map_err(|e| e.to_string())?;
|
||||
let (mut rx, child) = sidecar_command.spawn().map_err(|e| e.to_string())?;
|
||||
.map_err(|e| {
|
||||
error!("Sidecar command creation failed: {}", e);
|
||||
e.to_string()
|
||||
})?;
|
||||
let spawn_result = sidecar_command.spawn();
|
||||
let (mut rx, child) = match spawn_result {
|
||||
Ok(res) => {
|
||||
info!("Sidecar spawned successfully");
|
||||
res
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Sidecar spawn failed: {}", e);
|
||||
return Err(e.to_string());
|
||||
}
|
||||
};
|
||||
|
||||
tauri::async_runtime::spawn(async move {
|
||||
while let Some(event) = rx.recv().await {
|
||||
@@ -403,10 +438,15 @@ async fn cast_play(
|
||||
"command": "play",
|
||||
"args": { "ip": ip, "url": url }
|
||||
});
|
||||
|
||||
child
|
||||
.write(format!("{}\n", play_cmd.to_string()).as_bytes())
|
||||
.map_err(|e| e.to_string())?;
|
||||
let play_payload = format!("{}\n", play_cmd.to_string());
|
||||
info!("Sending cast URL to device '{}': {}", device_name, url);
|
||||
match child.write(play_payload.as_bytes()) {
|
||||
Ok(()) => info!("Sidecar write OK"),
|
||||
Err(e) => {
|
||||
error!("Sidecar write failed: {}", e);
|
||||
return Err(e.to_string());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -539,7 +579,7 @@ pub fn run() {
|
||||
})
|
||||
.setup(|app| {
|
||||
app.manage(AppState {
|
||||
known_devices: Arc::new(RwLock::new(HashMap::new())),
|
||||
known_devices: Arc::new(TokioRwLock::new(HashMap::new())),
|
||||
});
|
||||
app.manage(SidecarState {
|
||||
child: Mutex::new(None),
|
||||
@@ -560,69 +600,131 @@ pub fn run() {
|
||||
app.manage(PlayerRuntime { shared, controller });
|
||||
|
||||
let handle = app.handle().clone();
|
||||
let mdns_handle = handle.clone();
|
||||
thread::spawn(move || {
|
||||
let mdns = ServiceDaemon::new().expect("Failed to create daemon");
|
||||
let receiver = mdns
|
||||
.browse("_googlecast._tcp.local.")
|
||||
.expect("Failed to browse");
|
||||
while let Ok(event) = receiver.recv() {
|
||||
match event {
|
||||
ServiceEvent::ServiceResolved(info) => {
|
||||
let name = info
|
||||
.get_property_val_str("fn")
|
||||
.or_else(|| Some(info.get_fullname()))
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let addresses = info.get_addresses();
|
||||
let ip = addresses
|
||||
.iter()
|
||||
.find(|ip| ip.is_ipv4())
|
||||
.or_else(|| addresses.iter().next());
|
||||
|
||||
if let Some(ip) = ip {
|
||||
let state = mdns_handle.state::<AppState>();
|
||||
let mut devices = state.known_devices.write().unwrap();
|
||||
let ip_str = ip.to_string();
|
||||
let now = std::time::Instant::now();
|
||||
if !devices.contains_key(&name) {
|
||||
// new device discovered
|
||||
let info = DeviceInfo { ip: ip_str.clone(), last_seen: now };
|
||||
devices.insert(name.clone(), info);
|
||||
let _ = mdns_handle.emit("cast-device-discovered", json!({"name": name, "ip": ip_str}));
|
||||
} else {
|
||||
// update last_seen and possibly IP
|
||||
if let Some(d) = devices.get_mut(&name) {
|
||||
d.last_seen = now;
|
||||
d.ip = ip_str;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
// Bridge blocking mdns-sd into async device handling via an unbounded channel.
|
||||
let mdns_handle = handle.clone();
|
||||
let (mdns_tx, mut mdns_rx) = mpsc::unbounded_channel::<(String, String)>();
|
||||
|
||||
// Task: consume events from the channel and update `known_devices` asynchronously.
|
||||
let consumer_handle = mdns_handle.clone();
|
||||
tauri::async_runtime::spawn(async move {
|
||||
while let Some((name, ip_str)) = mdns_rx.recv().await {
|
||||
let state = consumer_handle.state::<AppState>();
|
||||
let mut devices = state.known_devices.write().await;
|
||||
let now = std::time::Instant::now();
|
||||
if !devices.contains_key(&name) {
|
||||
let info = DeviceInfo { ip: ip_str.clone(), last_seen: now };
|
||||
devices.insert(name.clone(), info);
|
||||
let _ = consumer_handle.emit("cast-device-discovered", json!({"name": name, "ip": ip_str}));
|
||||
} else if let Some(d) = devices.get_mut(&name) {
|
||||
d.last_seen = now;
|
||||
d.ip = ip_str;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn a GC thread to drop stale devices and notify frontend
|
||||
let gc_handle = handle.clone();
|
||||
thread::spawn(move || {
|
||||
let stale_after = Duration::from_secs(30);
|
||||
loop {
|
||||
std::thread::sleep(Duration::from_secs(10));
|
||||
let state = gc_handle.state::<AppState>();
|
||||
let mut devices = state.known_devices.write().unwrap();
|
||||
let now = std::time::Instant::now();
|
||||
let mut removed: Vec<String> = Vec::new();
|
||||
devices.retain(|name, info| {
|
||||
if now.duration_since(info.last_seen) > stale_after {
|
||||
removed.push(name.clone());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
// Probe implementation:
|
||||
// - If the feature `use_agnostic_mdns` is enabled, use the async `agnostic-mdns` API.
|
||||
// - Otherwise keep the existing blocking `mdns-sd` browse running in a blocking task.
|
||||
let probe_tx = mdns_tx.clone();
|
||||
|
||||
#[cfg(feature = "use_agnostic_mdns")]
|
||||
{
|
||||
// Use agnostic-mdns async API (tokio) to query for Google Cast services
|
||||
tauri::async_runtime::spawn(async move {
|
||||
// Create the async channel expected by agnostic-mdns query
|
||||
let (tx, rx) = agnostic_mdns::tokio::channel::unbounded::<agnostic_mdns::worksteal::ServiceEntry>();
|
||||
|
||||
// Build query params for _googlecast._tcp in the local domain.
|
||||
let params = agnostic_mdns::QueryParam::new("_googlecast._tcp".into())
|
||||
.with_domain("local.".into());
|
||||
|
||||
// Spawn the query task which will send ServiceEntry values into `tx`.
|
||||
let _ = tokio::spawn(async move {
|
||||
let _ = agnostic_mdns::tokio::query(params, tx).await;
|
||||
});
|
||||
drop(devices);
|
||||
|
||||
// Consume ServiceEntry results and forward (name, ip) into the probe channel.
|
||||
let rx = rx;
|
||||
while let Ok(entry) = rx.recv().await {
|
||||
// Try TXT records for friendly name: entries like "fn=Living Room".
|
||||
let mut friendly: Option<String> = None;
|
||||
for s in entry.txt() {
|
||||
let s_str = s.to_string();
|
||||
if let Some(rest) = s_str.strip_prefix("fn=") {
|
||||
friendly = Some(rest.to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: use debug-formatted entry name if TXT 'fn' not present.
|
||||
// This avoids depending on the concrete return type of `name()`.
|
||||
let name = friendly.unwrap_or_else(|| format!("{:?}", entry.name()));
|
||||
|
||||
// Prefer IPv4, then IPv6.
|
||||
let ip_opt = entry
|
||||
.ipv4_addr()
|
||||
.map(|a| a.to_string())
|
||||
.or_else(|| entry.ipv6_addr().map(|a| a.to_string()));
|
||||
|
||||
if let Some(ip_str) = ip_opt {
|
||||
let _ = probe_tx.send((name, ip_str));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "use_agnostic_mdns"))]
|
||||
{
|
||||
// Offload blocking mdns-sd browse loop to a blocking thread and forward events over the channel.
|
||||
tauri::async_runtime::spawn(async move {
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
let mdns = ServiceDaemon::new().expect("Failed to create daemon");
|
||||
let receiver = mdns
|
||||
.browse("_googlecast._tcp.local.")
|
||||
.expect("Failed to browse");
|
||||
while let Ok(event) = receiver.recv() {
|
||||
if let ServiceEvent::ServiceResolved(info) = event {
|
||||
let name = info
|
||||
.get_property_val_str("fn")
|
||||
.or_else(|| Some(info.get_fullname()))
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let addresses = info.get_addresses();
|
||||
let ip = addresses
|
||||
.iter()
|
||||
.find(|ip| ip.is_ipv4())
|
||||
.or_else(|| addresses.iter().next());
|
||||
if let Some(ip) = ip {
|
||||
let ip_str = ip.to_string();
|
||||
// Best-effort send into the async channel; ignore if receiver dropped.
|
||||
let _ = probe_tx.send((name, ip_str));
|
||||
}
|
||||
}
|
||||
}
|
||||
}).await;
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn an async GC task to drop stale devices and notify frontend
|
||||
let gc_handle = handle.clone();
|
||||
tauri::async_runtime::spawn(async move {
|
||||
let stale_after = Duration::from_secs(30);
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(10));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let state = gc_handle.state::<AppState>();
|
||||
let mut devices = state.known_devices.write().await;
|
||||
let now = std::time::Instant::now();
|
||||
let mut removed: Vec<String> = Vec::new();
|
||||
devices.retain(|name, info| {
|
||||
if now.duration_since(info.last_seen) > stale_after {
|
||||
removed.push(name.clone());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
for name in removed {
|
||||
let _ = gc_handle.emit("cast-device-removed", json!({"name": name}));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user