From c515886cbfca4012dce69c50a0c768ee86bd4526 Mon Sep 17 00:00:00 2001 From: Shaun Arman Date: Sat, 6 Jun 2026 18:32:08 -0500 Subject: [PATCH] fix: properly handle kubectl subprocess with async child management - Replace Arc> with Arc>> for Send/Sync safety - Store child in ChildWaitHandle with background task for async waiting - Implement stop_async() to properly kill kubectl subprocess - Add temp kubeconfig cleanup via RAII TempFileCleanup - Cache regex pattern with lazy_static! for performance - Add namespace validation with max length check (253 chars) - Update stop_port_forward to use stop_async() for proper cleanup --- src-tauri/src/commands/kube.rs | 83 ++++++++++++++++------ src-tauri/src/kube/portforward.rs | 113 ++++++++++++++++++++++++++---- 2 files changed, 161 insertions(+), 35 deletions(-) diff --git a/src-tauri/src/commands/kube.rs b/src-tauri/src/commands/kube.rs index 2f5b9858..0f9818f7 100644 --- a/src-tauri/src/commands/kube.rs +++ b/src-tauri/src/commands/kube.rs @@ -1,7 +1,8 @@ -use crate::kube::portforward::PortForwardSessionConfig; +use crate::kube::portforward::{PortForwardSession, PortForwardSessionConfig}; use crate::kube::ClusterClient; use crate::shell::kubectl::locate_kubectl; use crate::state::AppState; +use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_yaml::Value; @@ -10,6 +11,11 @@ use tauri::State; use tokio::process::Command; use tracing::info; +// Regex pattern for Kubernetes resource names - cached for performance +lazy_static! { + static ref NAME_PATTERN_REGEX: Regex = Regex::new(r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$").unwrap(); +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClusterInfo { pub id: String, @@ -200,6 +206,15 @@ pub async fn test_cluster_connection( std::fs::write(&temp_path, kubeconfig_content) .map_err(|e| format!("Failed to write kubeconfig temp file: {e}"))?; + // Cleanup temp file on drop + struct TempFileCleanup(std::path::PathBuf); + impl Drop for TempFileCleanup { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.0); + } + } + let _cleanup = TempFileCleanup(temp_path.clone()); + // Run kubectl cluster-info let kubectl_path = locate_kubectl()?; @@ -372,7 +387,6 @@ fn parse_creation_timestamp(timestamp: &str) -> String { // Regex patterns for Kubernetes resource names // Must match: ^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$ (DNS subdomain name) // Added max length check (253 chars) to prevent ReDoS attacks -const NAME_PATTERN: &str = r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$"; const MAX_NAME_LENGTH: usize = 253; /// Validates a Kubernetes resource name against DNS subdomain naming rules. @@ -409,12 +423,11 @@ pub fn validate_resource_name(name: &str, field_name: &str) -> Result<(), String )); } - // Validate against the regex pattern - let pattern = Regex::new(NAME_PATTERN).map_err(|e| format!("Invalid regex pattern: {}", e))?; - if !pattern.is_match(name) { + // Use cached regex pattern + if !NAME_PATTERN_REGEX.is_match(name) { return Err(format!( "{} '{}' does not match pattern {}", - field_name, name, NAME_PATTERN + field_name, name, r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$" )); } @@ -466,6 +479,15 @@ pub async fn start_port_forward( std::fs::write(&temp_path, kubeconfig_content.as_ref()) .map_err(|e| format!("Failed to write kubeconfig temp file: {e}"))?; + // Cleanup temp file on drop + struct TempFileCleanup(std::path::PathBuf); + impl Drop for TempFileCleanup { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.0); + } + } + let _cleanup = TempFileCleanup(temp_path.clone()); + // Build kubectl command let kubectl_path = locate_kubectl()?; let args = vec![ @@ -490,10 +512,8 @@ pub async fn start_port_forward( .spawn() .map_err(|e| format!("Failed to spawn kubectl: {e}"))?; - let child_mutex = Arc::new(std::sync::Mutex::new(child)); - // Create session with allocated port - let session = crate::kube::PortForwardSession::new(PortForwardSessionConfig { + let session = PortForwardSession::new(PortForwardSessionConfig { id: session_id.clone(), cluster_id: request.cluster_id.clone(), cluster_name, @@ -502,14 +522,15 @@ pub async fn start_port_forward( container: None, ports: vec![request.container_port], local_ports: vec![local_port], + temp_kubeconfig_path: Some(temp_path), }); - // Store child handle in session + // Store child handle in session - spawn background task to wait on child { let mut port_forwards = state.port_forwards.lock().await; port_forwards.insert(session_id.clone(), session); let session_mut = port_forwards.get_mut(&session_id).unwrap(); - session_mut.kubectl_child = Some(child_mutex); + session_mut.spawn_child_waiter(child); } info!( @@ -534,17 +555,8 @@ pub async fn stop_port_forward(id: String, state: State<'_, AppState>) -> Result let mut port_forwards = state.port_forwards.lock().await; if let Some(session) = port_forwards.get_mut(&id) { - session.stop(); + session.stop_async().await; info!(session_id = %id, "Port-forward session stopped"); - - // Kill the kubectl process to ensure termination - if let Some(child_mutex) = &session.kubectl_child { - let mut child = child_mutex.lock().unwrap(); - // Kill the child process - use std::mem::drop to explicitly handle the Future - std::mem::drop(child.kill()); - let _ = child.try_wait(); - } - Ok(()) } else { Err(format!("Port forward session {id} not found")) @@ -649,4 +661,33 @@ mod tests { assert_eq!(request.container_port, parsed.container_port); assert_eq!(request.local_port, parsed.local_port); } + + #[test] + fn test_validate_resource_name_valid() { + // Valid names + assert!(validate_resource_name("my-pod", "pod").is_ok()); + assert!(validate_resource_name("my-pod-123", "pod").is_ok()); + assert!(validate_resource_name("a", "pod").is_ok()); + assert!(validate_resource_name("my.pod.name", "pod").is_ok()); + assert!(validate_resource_name("123", "pod").is_ok()); + } + + #[test] + fn test_validate_resource_name_invalid() { + // Invalid names + assert!(validate_resource_name("-mypod", "pod").is_err()); + assert!(validate_resource_name("mypod-", "pod").is_err()); + assert!(validate_resource_name(".mypod", "pod").is_err()); + assert!(validate_resource_name("mypod.", "pod").is_err()); + assert!(validate_resource_name("MYPOD", "pod").is_err()); + assert!(validate_resource_name("my_pod", "pod").is_err()); + assert!(validate_resource_name("", "pod").is_err()); + } + + #[test] + fn test_validate_resource_name_length() { + // Too long names + let long_name = "a".repeat(254); + assert!(validate_resource_name(&long_name, "pod").is_err()); + } } diff --git a/src-tauri/src/kube/portforward.rs b/src-tauri/src/kube/portforward.rs index 89f3e744..23c34f5f 100644 --- a/src-tauri/src/kube/portforward.rs +++ b/src-tauri/src/kube/portforward.rs @@ -1,6 +1,16 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use tokio::process::Child; +use tokio::sync::Mutex as TokioMutex; + +/// Background task handle for waiting on kubectl child process +pub struct ChildWaitHandle { + pub join_handle: tokio::task::JoinHandle<()>, + pub child_id: String, + pub child: Arc>>, +} + pub struct PortForwardSession { pub id: String, pub cluster_id: String, @@ -11,11 +21,15 @@ pub struct PortForwardSession { pub ports: Vec, pub local_ports: Vec, pub status: PortForwardStatus, - pub kubectl_child: Option>>, + /// Join handle for the background task waiting on the kubectl child + pub child_wait_handle: Option>>, pub is_stopped: Arc, pub error_message: Option, + /// Path to temp kubeconfig file for cleanup + pub temp_kubeconfig_path: Option, } +#[derive(Clone)] pub enum PortForwardStatus { Active, Stopped, @@ -32,6 +46,8 @@ pub struct PortForwardSessionConfig { pub container: Option, pub ports: Vec, pub local_ports: Vec, + /// Path to temp kubeconfig file for cleanup + pub temp_kubeconfig_path: Option, } impl PortForwardSession { @@ -46,21 +62,81 @@ impl PortForwardSession { ports: config.ports, local_ports: config.local_ports, status: PortForwardStatus::Active, - kubectl_child: None, + child_wait_handle: None, is_stopped: Arc::new(AtomicBool::new(false)), error_message: None, + temp_kubeconfig_path: config.temp_kubeconfig_path, } } + /// Spawn a background task to wait on the kubectl child process + /// and update session state on completion/error + pub fn spawn_child_waiter(&mut self, child: Child) { + let child_id = format!("{}-{}", self.id, self.pod); + let is_stopped = self.is_stopped.clone(); + let status_clone = Arc::new(std::sync::Mutex::new(self.status.clone())); + let error_clone = Arc::new(std::sync::Mutex::new(self.error_message.clone())); + + // Store the child in an Arc>> so it can be accessed from the async task + // and also from the stop() method + let child_arc = Arc::new(TokioMutex::new(Some(child))); + + let child_for_task = child_arc.clone(); + let join_handle = tokio::spawn(async move { + // Take the child from the Arc + let mut child = child_for_task.lock().await.take().expect("Child not set"); + + // Wait for the child process to complete + // This is safe because we're in an async context + let result = child.wait().await; + + // Only update if not already explicitly stopped + if !is_stopped.load(Ordering::SeqCst) { + match result { + Ok(status) if status.success() => { + *status_clone.lock().unwrap() = PortForwardStatus::Stopped; + } + Ok(status) => { + let error_msg = format!("kubectl process exited with status: {}", status); + *status_clone.lock().unwrap() = PortForwardStatus::Error(error_msg.clone()); + *error_clone.lock().unwrap() = Some(error_msg); + } + Err(e) => { + let error_msg = format!("Failed to wait for kubectl process: {}", e); + *status_clone.lock().unwrap() = PortForwardStatus::Error(error_msg.clone()); + *error_clone.lock().unwrap() = Some(error_msg); + } + } + } + }); + + self.child_wait_handle = Some(Arc::new(TokioMutex::new(ChildWaitHandle { + join_handle, + child_id, + child: child_arc, + }))); + } + pub fn stop(&mut self) { self.is_stopped.store(true, Ordering::SeqCst); self.status = PortForwardStatus::Stopped; - if let Some(child_mutex) = &self.kubectl_child { - let mut child = child_mutex.lock().unwrap(); - // Kill the child process - kill() returns a Future - // We use std::mem::drop to ignore the Future result since we can't await here - std::mem::drop(child.kill()); + // Drop the child wait handle - this cancels the background task + // and the Child will be dropped, which kills it + self.child_wait_handle = None; + } + + pub async fn stop_async(&mut self) { + self.is_stopped.store(true, Ordering::SeqCst); + self.status = PortForwardStatus::Stopped; + + // Kill the child process if it exists + if let Some(ref child_wait_handle) = self.child_wait_handle { + let guard = child_wait_handle.lock().await; + let child_opt = guard.child.lock().await.take(); + if let Some(mut child) = child_opt { + let _ = child.kill().await; + } } } @@ -81,11 +157,14 @@ impl Drop for PortForwardSession { return; } - if let Some(child_mutex) = &self.kubectl_child { - let mut child = child_mutex.lock().unwrap(); - // Kill the child process - kill() returns a Future - // We use std::mem::drop to ignore the Future result since we can't await here - std::mem::drop(child.kill()); + // Kill the child process if it exists + // Note: This is called from sync context, so we can't await + // The Child will be dropped and cleaned up by the background task + self.child_wait_handle = None; + + // Clean up temp kubeconfig file if it exists + if let Some(path) = &self.temp_kubeconfig_path { + let _ = std::fs::remove_file(path); } } } @@ -105,6 +184,7 @@ mod tests { container: None, ports: vec![8080], local_ports: vec![0], + temp_kubeconfig_path: None, }; let session = PortForwardSession::new(config); @@ -130,6 +210,7 @@ mod tests { container: None, ports: vec![9000], local_ports: vec![0], + temp_kubeconfig_path: None, }; let mut session = PortForwardSession::new(config); @@ -150,6 +231,7 @@ mod tests { container: None, ports: vec![9000], local_ports: vec![0], + temp_kubeconfig_path: None, }; let mut session = PortForwardSession::new(config); @@ -175,6 +257,7 @@ mod tests { container: None, ports: vec![9000], local_ports: vec![0], + temp_kubeconfig_path: None, }; let session = PortForwardSession::new(config); @@ -191,9 +274,10 @@ mod tests { ports: vec![9000], local_ports: vec![0], status: PortForwardStatus::Stopped, - kubectl_child: None, + child_wait_handle: None, is_stopped: Arc::new(AtomicBool::new(false)), error_message: None, + temp_kubeconfig_path: None, }; assert!(!stopped_session.is_active()); @@ -208,9 +292,10 @@ mod tests { ports: vec![9000], local_ports: vec![0], status: PortForwardStatus::Error("error".to_string()), - kubectl_child: None, + child_wait_handle: None, is_stopped: Arc::new(AtomicBool::new(false)), error_message: Some("error".to_string()), + temp_kubeconfig_path: None, }; assert!(!error_session.is_active()); }