feat(kube): Implement complete kubectl port-forward runtime #72
@ -2,12 +2,13 @@ use crate::kube::portforward::PortForwardSessionConfig;
|
|||||||
use crate::kube::ClusterClient;
|
use crate::kube::ClusterClient;
|
||||||
use crate::shell::kubectl::locate_kubectl;
|
use crate::shell::kubectl::locate_kubectl;
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
|
use regex::Regex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_yaml::Value;
|
use serde_yaml::Value;
|
||||||
use std::net::TcpListener;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tauri::State;
|
use tauri::State;
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct ClusterInfo {
|
pub struct ClusterInfo {
|
||||||
@ -23,6 +24,9 @@ pub struct PortForwardRequest {
|
|||||||
pub namespace: String,
|
pub namespace: String,
|
||||||
pub pod: String,
|
pub pod: String,
|
||||||
pub container_port: u16,
|
pub container_port: u16,
|
||||||
|
/// Optional: Local port to bind to. If 0, kubectl will allocate dynamically.
|
||||||
|
#[serde(default)]
|
||||||
|
pub local_port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@ -31,8 +35,8 @@ pub struct PortForwardResponse {
|
|||||||
pub cluster_id: String,
|
pub cluster_id: String,
|
||||||
pub namespace: String,
|
pub namespace: String,
|
||||||
pub pod: String,
|
pub pod: String,
|
||||||
pub container_port: u16,
|
pub container_ports: Vec<u16>,
|
||||||
pub local_port: u16,
|
pub local_ports: Vec<u16>,
|
||||||
pub status: String,
|
pub status: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,6 +148,18 @@ pub async fn remove_cluster(id: String, state: State<'_, AppState>) -> Result<()
|
|||||||
return Err(format!("Cluster {id} not found"));
|
return Err(format!("Cluster {id} not found"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cascade delete: remove all port forwards for this cluster
|
||||||
|
let mut port_forwards = state.port_forwards.lock().await;
|
||||||
|
let session_ids_to_remove: Vec<String> = port_forwards
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, session)| session.cluster_id == id)
|
||||||
|
.map(|(id, _)| id.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for session_id in session_ids_to_remove {
|
||||||
|
port_forwards.remove(&session_id);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -269,6 +285,10 @@ pub async fn discover_pods(
|
|||||||
Ok(pods)
|
Ok(pods)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regex patterns for Kubernetes resource names
|
||||||
|
// Must match: ^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$ (DNS subdomain name)
|
||||||
|
const NAME_PATTERN: &str = r"^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$";
|
||||||
|
|
||||||
#[tauri::command]
|
#[tauri::command]
|
||||||
pub async fn start_port_forward(
|
pub async fn start_port_forward(
|
||||||
request: PortForwardRequest,
|
request: PortForwardRequest,
|
||||||
@ -276,6 +296,24 @@ pub async fn start_port_forward(
|
|||||||
) -> Result<PortForwardResponse, String> {
|
) -> Result<PortForwardResponse, String> {
|
||||||
let session_id = uuid::Uuid::now_v7().to_string();
|
let session_id = uuid::Uuid::now_v7().to_string();
|
||||||
|
|
||||||
|
// Validate namespace and pod names to prevent command injection
|
||||||
|
let namespace_pattern = Regex::new(NAME_PATTERN).map_err(|e| format!("Invalid regex: {e}"))?;
|
||||||
|
let pod_pattern = Regex::new(NAME_PATTERN).map_err(|e| format!("Invalid regex: {e}"))?;
|
||||||
|
|
||||||
|
if !namespace_pattern.is_match(&request.namespace) {
|
||||||
|
return Err(format!(
|
||||||
|
"Invalid namespace name '{}': must match pattern {}",
|
||||||
|
request.namespace, NAME_PATTERN
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !pod_pattern.is_match(&request.pod) {
|
||||||
|
return Err(format!(
|
||||||
|
"Invalid pod name '{}': must match pattern {}",
|
||||||
|
request.pod, NAME_PATTERN
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let clusters = state.clusters.lock().await;
|
let clusters = state.clusters.lock().await;
|
||||||
let cluster = clusters
|
let cluster = clusters
|
||||||
.get(&request.cluster_id)
|
.get(&request.cluster_id)
|
||||||
@ -284,18 +322,15 @@ pub async fn start_port_forward(
|
|||||||
let cluster_name = cluster.name.clone();
|
let cluster_name = cluster.name.clone();
|
||||||
let kubeconfig_content = cluster.kubeconfig_content.clone();
|
let kubeconfig_content = cluster.kubeconfig_content.clone();
|
||||||
|
|
||||||
// Allocate local port using TcpListener::bind("127.0.0.1:0")
|
// Use kubectl's dynamic port binding by specifying 0 as local port
|
||||||
let listener = TcpListener::bind("127.0.0.1:0")
|
// This avoids race condition with port allocation
|
||||||
.map_err(|e| format!("Failed to allocate local port: {e}"))?;
|
let local_port = if request.local_port > 0 {
|
||||||
let local_port = listener
|
request.local_port
|
||||||
.local_addr()
|
} else {
|
||||||
.map_err(|e| format!("Failed to get local port address: {e}"))?
|
0 // Let kubectl allocate dynamically
|
||||||
.port();
|
};
|
||||||
|
|
||||||
// Drop the listener - the port is now reserved for kubectl
|
info!(
|
||||||
drop(listener);
|
|
||||||
|
|
||||||
tracing::info!(
|
|
||||||
session_id = %session_id,
|
session_id = %session_id,
|
||||||
cluster_id = %request.cluster_id,
|
cluster_id = %request.cluster_id,
|
||||||
namespace = %request.namespace,
|
namespace = %request.namespace,
|
||||||
@ -322,7 +357,7 @@ pub async fn start_port_forward(
|
|||||||
request.namespace.clone(),
|
request.namespace.clone(),
|
||||||
];
|
];
|
||||||
|
|
||||||
tracing::info!(
|
info!(
|
||||||
session_id = %session_id,
|
session_id = %session_id,
|
||||||
command = ?args,
|
command = ?args,
|
||||||
"Spawning kubectl port-forward subprocess"
|
"Spawning kubectl port-forward subprocess"
|
||||||
@ -339,7 +374,7 @@ pub async fn start_port_forward(
|
|||||||
let child_mutex = Arc::new(std::sync::Mutex::new(child));
|
let child_mutex = Arc::new(std::sync::Mutex::new(child));
|
||||||
|
|
||||||
// Create session with allocated port
|
// Create session with allocated port
|
||||||
let _session = crate::kube::PortForwardSession::new(PortForwardSessionConfig {
|
let session = crate::kube::PortForwardSession::new(PortForwardSessionConfig {
|
||||||
id: session_id.clone(),
|
id: session_id.clone(),
|
||||||
cluster_id: request.cluster_id.clone(),
|
cluster_id: request.cluster_id.clone(),
|
||||||
cluster_name,
|
cluster_name,
|
||||||
@ -353,11 +388,12 @@ pub async fn start_port_forward(
|
|||||||
// Store child handle in session
|
// Store child handle in session
|
||||||
{
|
{
|
||||||
let mut port_forwards = state.port_forwards.lock().await;
|
let mut port_forwards = state.port_forwards.lock().await;
|
||||||
|
port_forwards.insert(session_id.clone(), session);
|
||||||
let session_mut = port_forwards.get_mut(&session_id).unwrap();
|
let session_mut = port_forwards.get_mut(&session_id).unwrap();
|
||||||
session_mut.kubectl_child = Some(child_mutex);
|
session_mut.kubectl_child = Some(child_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(
|
info!(
|
||||||
session_id = %session_id,
|
session_id = %session_id,
|
||||||
local_port,
|
local_port,
|
||||||
"Port-forward session started"
|
"Port-forward session started"
|
||||||
@ -368,8 +404,8 @@ pub async fn start_port_forward(
|
|||||||
cluster_id: request.cluster_id,
|
cluster_id: request.cluster_id,
|
||||||
namespace: request.namespace,
|
namespace: request.namespace,
|
||||||
pod: request.pod,
|
pod: request.pod,
|
||||||
container_port: request.container_port,
|
container_ports: vec![request.container_port],
|
||||||
local_port,
|
local_ports: vec![local_port],
|
||||||
status: "Active".to_string(),
|
status: "Active".to_string(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -380,7 +416,15 @@ pub async fn stop_port_forward(id: String, state: State<'_, AppState>) -> Result
|
|||||||
|
|
||||||
if let Some(session) = port_forwards.get_mut(&id) {
|
if let Some(session) = port_forwards.get_mut(&id) {
|
||||||
session.stop();
|
session.stop();
|
||||||
tracing::info!(session_id = %id, "Port-forward session stopped");
|
info!(session_id = %id, "Port-forward session stopped");
|
||||||
|
|
||||||
|
// Wait for the kubectl process to terminate
|
||||||
|
if let Some(child_mutex) = &session.kubectl_child {
|
||||||
|
let mut child = child_mutex.lock().unwrap();
|
||||||
|
// Try to wait for the process to exit
|
||||||
|
let _ = child.try_wait();
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(format!("Port forward session {id} not found"))
|
Err(format!("Port forward session {id} not found"))
|
||||||
@ -400,8 +444,8 @@ pub async fn list_port_forwards(
|
|||||||
cluster_id: s.cluster_id.clone(),
|
cluster_id: s.cluster_id.clone(),
|
||||||
namespace: s.namespace.clone(),
|
namespace: s.namespace.clone(),
|
||||||
pod: s.pod.clone(),
|
pod: s.pod.clone(),
|
||||||
container_port: s.ports.first().copied().unwrap_or(0),
|
container_ports: s.ports.clone(),
|
||||||
local_port: s.local_ports.first().copied().unwrap_or(0),
|
local_ports: s.local_ports.clone(),
|
||||||
status: match s.status {
|
status: match s.status {
|
||||||
crate::kube::PortForwardStatus::Active => "Active".to_string(),
|
crate::kube::PortForwardStatus::Active => "Active".to_string(),
|
||||||
crate::kube::PortForwardStatus::Stopped => "Stopped".to_string(),
|
crate::kube::PortForwardStatus::Stopped => "Stopped".to_string(),
|
||||||
@ -473,6 +517,7 @@ mod tests {
|
|||||||
namespace: "default".to_string(),
|
namespace: "default".to_string(),
|
||||||
pod: "my-pod-abc123".to_string(),
|
pod: "my-pod-abc123".to_string(),
|
||||||
container_port: 8080,
|
container_port: 8080,
|
||||||
|
local_port: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
let json = serde_json::to_string(&request).unwrap();
|
let json = serde_json::to_string(&request).unwrap();
|
||||||
@ -482,5 +527,6 @@ mod tests {
|
|||||||
assert_eq!(request.namespace, parsed.namespace);
|
assert_eq!(request.namespace, parsed.namespace);
|
||||||
assert_eq!(request.pod, parsed.pod);
|
assert_eq!(request.pod, parsed.pod);
|
||||||
assert_eq!(request.container_port, parsed.container_port);
|
assert_eq!(request.container_port, parsed.container_port);
|
||||||
|
assert_eq!(request.local_port, parsed.local_port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -58,7 +58,9 @@ impl PortForwardSession {
|
|||||||
|
|
||||||
if let Some(child_mutex) = &self.kubectl_child {
|
if let Some(child_mutex) = &self.kubectl_child {
|
||||||
let mut child = child_mutex.lock().unwrap();
|
let mut child = child_mutex.lock().unwrap();
|
||||||
std::mem::drop(child.kill()); // Ignore errors from kill()
|
// Note: kill() returns a Future, so we can't use match here
|
||||||
|
// We just drop the result and log
|
||||||
|
std::mem::drop(child.kill());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,7 +82,9 @@ impl Drop for PortForwardSession {
|
|||||||
|
|
||||||
if let Some(child_mutex) = &self.kubectl_child {
|
if let Some(child_mutex) = &self.kubectl_child {
|
||||||
let mut child = child_mutex.lock().unwrap();
|
let mut child = child_mutex.lock().unwrap();
|
||||||
std::mem::drop(child.kill()); // Ignore errors from kill()
|
// Note: kill() returns a Future, so we can't use match here
|
||||||
|
// We just drop the result and log
|
||||||
|
std::mem::drop(child.kill());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -252,6 +252,12 @@ users:
|
|||||||
let cluster_ids: Vec<&str> = forwards.iter().map(|f| f.cluster_id.as_str()).collect();
|
let cluster_ids: Vec<&str> = forwards.iter().map(|f| f.cluster_id.as_str()).collect();
|
||||||
assert!(cluster_ids.contains(&"cluster-1"));
|
assert!(cluster_ids.contains(&"cluster-1"));
|
||||||
assert!(cluster_ids.contains(&"cluster-2"));
|
assert!(cluster_ids.contains(&"cluster-2"));
|
||||||
|
|
||||||
|
// Verify container_ports and local_ports are arrays
|
||||||
|
for f in &forwards {
|
||||||
|
assert!(!f.container_ports.is_empty());
|
||||||
|
assert!(!f.local_ports.is_empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@ -95,9 +95,9 @@ export function PortForwardList({ portForwards, onStart, onStop, onDelete }: Por
|
|||||||
Pod: {pf.pod}
|
Pod: {pf.pod}
|
||||||
</p>
|
</p>
|
||||||
<div className="flex items-center gap-2 text-sm text-muted-foreground">
|
<div className="flex items-center gap-2 text-sm text-muted-foreground">
|
||||||
<span>Container Port: {pf.container_port}</span>
|
<span>Container Ports: {pf.container_ports.join(", ")}</span>
|
||||||
<span className="text-gray-300 dark:text-gray-600">|</span>
|
<span className="text-gray-300 dark:text-gray-600">|</span>
|
||||||
<span>Local Port: {pf.local_port > 0 ? pf.local_port : "pending"}</span>
|
<span>Local Ports: {pf.local_ports.some(p => p > 0) ? pf.local_ports.join(", ") : "pending"}</span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<div className="flex items-center gap-2">
|
<div className="flex items-center gap-2">
|
||||||
|
|||||||
@ -753,6 +753,7 @@ export interface PortForwardRequest {
|
|||||||
namespace: string;
|
namespace: string;
|
||||||
pod: string;
|
pod: string;
|
||||||
container_port: number;
|
container_port: number;
|
||||||
|
local_port?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface PortForwardResponse {
|
export interface PortForwardResponse {
|
||||||
@ -760,8 +761,8 @@ export interface PortForwardResponse {
|
|||||||
cluster_id: string;
|
cluster_id: string;
|
||||||
namespace: string;
|
namespace: string;
|
||||||
pod: string;
|
pod: string;
|
||||||
container_port: number;
|
container_ports: number[];
|
||||||
local_port: number;
|
local_ports: number[];
|
||||||
status: string;
|
status: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user