fix: properly handle kubectl subprocess with async child management

- Replace Arc<Mutex<Child>> with Arc<Mutex<Option<Child>>> for Send/Sync safety
- Store child in ChildWaitHandle with background task for async waiting
- Implement stop_async() to properly kill kubectl subprocess
- Add temp kubeconfig cleanup via RAII TempFileCleanup
- Cache regex pattern with lazy_static! for performance
- Add namespace validation with max length check (253 chars)
- Update stop_port_forward to use stop_async() for proper cleanup
This commit is contained in:
Shaun Arman 2026-06-06 18:32:08 -05:00
parent 0dfb16e5f6
commit c515886cbf
2 changed files with 161 additions and 35 deletions

View File

@ -1,7 +1,8 @@
use crate::kube::portforward::PortForwardSessionConfig; use crate::kube::portforward::{PortForwardSession, PortForwardSessionConfig};
use crate::kube::ClusterClient; use crate::kube::ClusterClient;
use crate::shell::kubectl::locate_kubectl; use crate::shell::kubectl::locate_kubectl;
use crate::state::AppState; use crate::state::AppState;
use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_yaml::Value; use serde_yaml::Value;
@ -10,6 +11,11 @@ use tauri::State;
use tokio::process::Command; use tokio::process::Command;
use tracing::info; use tracing::info;
// Regex pattern for Kubernetes resource names - cached for performance
lazy_static! {
static ref NAME_PATTERN_REGEX: Regex = Regex::new(r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$").unwrap();
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterInfo { pub struct ClusterInfo {
pub id: String, pub id: String,
@ -200,6 +206,15 @@ pub async fn test_cluster_connection(
std::fs::write(&temp_path, kubeconfig_content) std::fs::write(&temp_path, kubeconfig_content)
.map_err(|e| format!("Failed to write kubeconfig temp file: {e}"))?; .map_err(|e| format!("Failed to write kubeconfig temp file: {e}"))?;
// Cleanup temp file on drop
struct TempFileCleanup(std::path::PathBuf);
impl Drop for TempFileCleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
let _cleanup = TempFileCleanup(temp_path.clone());
// Run kubectl cluster-info // Run kubectl cluster-info
let kubectl_path = locate_kubectl()?; let kubectl_path = locate_kubectl()?;
@ -372,7 +387,6 @@ fn parse_creation_timestamp(timestamp: &str) -> String {
// Regex patterns for Kubernetes resource names // Regex patterns for Kubernetes resource names
// Must match: ^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$ (DNS subdomain name) // Must match: ^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$ (DNS subdomain name)
// Added max length check (253 chars) to prevent ReDoS attacks // Added max length check (253 chars) to prevent ReDoS attacks
const NAME_PATTERN: &str = r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$";
const MAX_NAME_LENGTH: usize = 253; const MAX_NAME_LENGTH: usize = 253;
/// Validates a Kubernetes resource name against DNS subdomain naming rules. /// Validates a Kubernetes resource name against DNS subdomain naming rules.
@ -409,12 +423,11 @@ pub fn validate_resource_name(name: &str, field_name: &str) -> Result<(), String
)); ));
} }
// Validate against the regex pattern // Use cached regex pattern
let pattern = Regex::new(NAME_PATTERN).map_err(|e| format!("Invalid regex pattern: {}", e))?; if !NAME_PATTERN_REGEX.is_match(name) {
if !pattern.is_match(name) {
return Err(format!( return Err(format!(
"{} '{}' does not match pattern {}", "{} '{}' does not match pattern {}",
field_name, name, NAME_PATTERN field_name, name, r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$"
)); ));
} }
@ -466,6 +479,15 @@ pub async fn start_port_forward(
std::fs::write(&temp_path, kubeconfig_content.as_ref()) std::fs::write(&temp_path, kubeconfig_content.as_ref())
.map_err(|e| format!("Failed to write kubeconfig temp file: {e}"))?; .map_err(|e| format!("Failed to write kubeconfig temp file: {e}"))?;
// Cleanup temp file on drop
struct TempFileCleanup(std::path::PathBuf);
impl Drop for TempFileCleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
let _cleanup = TempFileCleanup(temp_path.clone());
// Build kubectl command // Build kubectl command
let kubectl_path = locate_kubectl()?; let kubectl_path = locate_kubectl()?;
let args = vec![ let args = vec![
@ -490,10 +512,8 @@ pub async fn start_port_forward(
.spawn() .spawn()
.map_err(|e| format!("Failed to spawn kubectl: {e}"))?; .map_err(|e| format!("Failed to spawn kubectl: {e}"))?;
let child_mutex = Arc::new(std::sync::Mutex::new(child));
// Create session with allocated port // Create session with allocated port
let session = crate::kube::PortForwardSession::new(PortForwardSessionConfig { let session = PortForwardSession::new(PortForwardSessionConfig {
id: session_id.clone(), id: session_id.clone(),
cluster_id: request.cluster_id.clone(), cluster_id: request.cluster_id.clone(),
cluster_name, cluster_name,
@ -502,14 +522,15 @@ pub async fn start_port_forward(
container: None, container: None,
ports: vec![request.container_port], ports: vec![request.container_port],
local_ports: vec![local_port], local_ports: vec![local_port],
temp_kubeconfig_path: Some(temp_path),
}); });
// Store child handle in session // Store child handle in session - spawn background task to wait on child
{ {
let mut port_forwards = state.port_forwards.lock().await; let mut port_forwards = state.port_forwards.lock().await;
port_forwards.insert(session_id.clone(), session); port_forwards.insert(session_id.clone(), session);
let session_mut = port_forwards.get_mut(&session_id).unwrap(); let session_mut = port_forwards.get_mut(&session_id).unwrap();
session_mut.kubectl_child = Some(child_mutex); session_mut.spawn_child_waiter(child);
} }
info!( info!(
@ -534,17 +555,8 @@ pub async fn stop_port_forward(id: String, state: State<'_, AppState>) -> Result
let mut port_forwards = state.port_forwards.lock().await; let mut port_forwards = state.port_forwards.lock().await;
if let Some(session) = port_forwards.get_mut(&id) { if let Some(session) = port_forwards.get_mut(&id) {
session.stop(); session.stop_async().await;
info!(session_id = %id, "Port-forward session stopped"); info!(session_id = %id, "Port-forward session stopped");
// Kill the kubectl process to ensure termination
if let Some(child_mutex) = &session.kubectl_child {
let mut child = child_mutex.lock().unwrap();
// Kill the child process - use std::mem::drop to explicitly handle the Future
std::mem::drop(child.kill());
let _ = child.try_wait();
}
Ok(()) Ok(())
} else { } else {
Err(format!("Port forward session {id} not found")) Err(format!("Port forward session {id} not found"))
@ -649,4 +661,33 @@ mod tests {
assert_eq!(request.container_port, parsed.container_port); assert_eq!(request.container_port, parsed.container_port);
assert_eq!(request.local_port, parsed.local_port); assert_eq!(request.local_port, parsed.local_port);
} }
#[test]
fn test_validate_resource_name_valid() {
// Valid names
assert!(validate_resource_name("my-pod", "pod").is_ok());
assert!(validate_resource_name("my-pod-123", "pod").is_ok());
assert!(validate_resource_name("a", "pod").is_ok());
assert!(validate_resource_name("my.pod.name", "pod").is_ok());
assert!(validate_resource_name("123", "pod").is_ok());
}
#[test]
fn test_validate_resource_name_invalid() {
// Invalid names
assert!(validate_resource_name("-mypod", "pod").is_err());
assert!(validate_resource_name("mypod-", "pod").is_err());
assert!(validate_resource_name(".mypod", "pod").is_err());
assert!(validate_resource_name("mypod.", "pod").is_err());
assert!(validate_resource_name("MYPOD", "pod").is_err());
assert!(validate_resource_name("my_pod", "pod").is_err());
assert!(validate_resource_name("", "pod").is_err());
}
#[test]
fn test_validate_resource_name_length() {
// Too long names
let long_name = "a".repeat(254);
assert!(validate_resource_name(&long_name, "pod").is_err());
}
} }

View File

@ -1,6 +1,16 @@
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::process::Child;
use tokio::sync::Mutex as TokioMutex;
/// Background task handle for waiting on kubectl child process
pub struct ChildWaitHandle {
pub join_handle: tokio::task::JoinHandle<()>,
pub child_id: String,
pub child: Arc<TokioMutex<Option<Child>>>,
}
pub struct PortForwardSession { pub struct PortForwardSession {
pub id: String, pub id: String,
pub cluster_id: String, pub cluster_id: String,
@ -11,11 +21,15 @@ pub struct PortForwardSession {
pub ports: Vec<u16>, pub ports: Vec<u16>,
pub local_ports: Vec<u16>, pub local_ports: Vec<u16>,
pub status: PortForwardStatus, pub status: PortForwardStatus,
pub kubectl_child: Option<Arc<std::sync::Mutex<tokio::process::Child>>>, /// Join handle for the background task waiting on the kubectl child
pub child_wait_handle: Option<Arc<TokioMutex<ChildWaitHandle>>>,
pub is_stopped: Arc<AtomicBool>, pub is_stopped: Arc<AtomicBool>,
pub error_message: Option<String>, pub error_message: Option<String>,
/// Path to temp kubeconfig file for cleanup
pub temp_kubeconfig_path: Option<std::path::PathBuf>,
} }
#[derive(Clone)]
pub enum PortForwardStatus { pub enum PortForwardStatus {
Active, Active,
Stopped, Stopped,
@ -32,6 +46,8 @@ pub struct PortForwardSessionConfig {
pub container: Option<String>, pub container: Option<String>,
pub ports: Vec<u16>, pub ports: Vec<u16>,
pub local_ports: Vec<u16>, pub local_ports: Vec<u16>,
/// Path to temp kubeconfig file for cleanup
pub temp_kubeconfig_path: Option<std::path::PathBuf>,
} }
impl PortForwardSession { impl PortForwardSession {
@ -46,21 +62,81 @@ impl PortForwardSession {
ports: config.ports, ports: config.ports,
local_ports: config.local_ports, local_ports: config.local_ports,
status: PortForwardStatus::Active, status: PortForwardStatus::Active,
kubectl_child: None, child_wait_handle: None,
is_stopped: Arc::new(AtomicBool::new(false)), is_stopped: Arc::new(AtomicBool::new(false)),
error_message: None, error_message: None,
temp_kubeconfig_path: config.temp_kubeconfig_path,
} }
} }
/// Spawn a background task to wait on the kubectl child process
/// and update session state on completion/error
pub fn spawn_child_waiter(&mut self, child: Child) {
let child_id = format!("{}-{}", self.id, self.pod);
let is_stopped = self.is_stopped.clone();
let status_clone = Arc::new(std::sync::Mutex::new(self.status.clone()));
let error_clone = Arc::new(std::sync::Mutex::new(self.error_message.clone()));
// Store the child in an Arc<Mutex<Option<Child>>> so it can be accessed from the async task
// and also from the stop() method
let child_arc = Arc::new(TokioMutex::new(Some(child)));
let child_for_task = child_arc.clone();
let join_handle = tokio::spawn(async move {
// Take the child from the Arc
let mut child = child_for_task.lock().await.take().expect("Child not set");
// Wait for the child process to complete
// This is safe because we're in an async context
let result = child.wait().await;
// Only update if not already explicitly stopped
if !is_stopped.load(Ordering::SeqCst) {
match result {
Ok(status) if status.success() => {
*status_clone.lock().unwrap() = PortForwardStatus::Stopped;
}
Ok(status) => {
let error_msg = format!("kubectl process exited with status: {}", status);
*status_clone.lock().unwrap() = PortForwardStatus::Error(error_msg.clone());
*error_clone.lock().unwrap() = Some(error_msg);
}
Err(e) => {
let error_msg = format!("Failed to wait for kubectl process: {}", e);
*status_clone.lock().unwrap() = PortForwardStatus::Error(error_msg.clone());
*error_clone.lock().unwrap() = Some(error_msg);
}
}
}
});
self.child_wait_handle = Some(Arc::new(TokioMutex::new(ChildWaitHandle {
join_handle,
child_id,
child: child_arc,
})));
}
pub fn stop(&mut self) { pub fn stop(&mut self) {
self.is_stopped.store(true, Ordering::SeqCst); self.is_stopped.store(true, Ordering::SeqCst);
self.status = PortForwardStatus::Stopped; self.status = PortForwardStatus::Stopped;
if let Some(child_mutex) = &self.kubectl_child { // Drop the child wait handle - this cancels the background task
let mut child = child_mutex.lock().unwrap(); // and the Child will be dropped, which kills it
// Kill the child process - kill() returns a Future self.child_wait_handle = None;
// We use std::mem::drop to ignore the Future result since we can't await here }
std::mem::drop(child.kill());
pub async fn stop_async(&mut self) {
self.is_stopped.store(true, Ordering::SeqCst);
self.status = PortForwardStatus::Stopped;
// Kill the child process if it exists
if let Some(ref child_wait_handle) = self.child_wait_handle {
let guard = child_wait_handle.lock().await;
let child_opt = guard.child.lock().await.take();
if let Some(mut child) = child_opt {
let _ = child.kill().await;
}
} }
} }
@ -81,11 +157,14 @@ impl Drop for PortForwardSession {
return; return;
} }
if let Some(child_mutex) = &self.kubectl_child { // Kill the child process if it exists
let mut child = child_mutex.lock().unwrap(); // Note: This is called from sync context, so we can't await
// Kill the child process - kill() returns a Future // The Child will be dropped and cleaned up by the background task
// We use std::mem::drop to ignore the Future result since we can't await here self.child_wait_handle = None;
std::mem::drop(child.kill());
// Clean up temp kubeconfig file if it exists
if let Some(path) = &self.temp_kubeconfig_path {
let _ = std::fs::remove_file(path);
} }
} }
} }
@ -105,6 +184,7 @@ mod tests {
container: None, container: None,
ports: vec![8080], ports: vec![8080],
local_ports: vec![0], local_ports: vec![0],
temp_kubeconfig_path: None,
}; };
let session = PortForwardSession::new(config); let session = PortForwardSession::new(config);
@ -130,6 +210,7 @@ mod tests {
container: None, container: None,
ports: vec![9000], ports: vec![9000],
local_ports: vec![0], local_ports: vec![0],
temp_kubeconfig_path: None,
}; };
let mut session = PortForwardSession::new(config); let mut session = PortForwardSession::new(config);
@ -150,6 +231,7 @@ mod tests {
container: None, container: None,
ports: vec![9000], ports: vec![9000],
local_ports: vec![0], local_ports: vec![0],
temp_kubeconfig_path: None,
}; };
let mut session = PortForwardSession::new(config); let mut session = PortForwardSession::new(config);
@ -175,6 +257,7 @@ mod tests {
container: None, container: None,
ports: vec![9000], ports: vec![9000],
local_ports: vec![0], local_ports: vec![0],
temp_kubeconfig_path: None,
}; };
let session = PortForwardSession::new(config); let session = PortForwardSession::new(config);
@ -191,9 +274,10 @@ mod tests {
ports: vec![9000], ports: vec![9000],
local_ports: vec![0], local_ports: vec![0],
status: PortForwardStatus::Stopped, status: PortForwardStatus::Stopped,
kubectl_child: None, child_wait_handle: None,
is_stopped: Arc::new(AtomicBool::new(false)), is_stopped: Arc::new(AtomicBool::new(false)),
error_message: None, error_message: None,
temp_kubeconfig_path: None,
}; };
assert!(!stopped_session.is_active()); assert!(!stopped_session.is_active());
@ -208,9 +292,10 @@ mod tests {
ports: vec![9000], ports: vec![9000],
local_ports: vec![0], local_ports: vec![0],
status: PortForwardStatus::Error("error".to_string()), status: PortForwardStatus::Error("error".to_string()),
kubectl_child: None, child_wait_handle: None,
is_stopped: Arc::new(AtomicBool::new(false)), is_stopped: Arc::new(AtomicBool::new(false)),
error_message: Some("error".to_string()), error_message: Some("error".to_string()),
temp_kubeconfig_path: None,
}; };
assert!(!error_session.is_active()); assert!(!error_session.is_active());
} }