tftsr-devops_investigation/src-tauri/src/shell/session.rs
Shaun Arman 9ae89bf487
All checks were successful
Test / frontend-typecheck (pull_request) Successful in 1m49s
Test / frontend-tests (pull_request) Successful in 1m46s
PR Review Automation / review (pull_request) Successful in 4m24s
Test / rust-fmt-check (pull_request) Successful in 12m1s
Test / rust-clippy (pull_request) Successful in 13m46s
Test / rust-tests (pull_request) Successful in 15m8s
fix(security): address automated code review findings
BLOCKER fixes:
- Implement create_azuredevops_workitem instead of returning a stub error,
  reusing the existing create_work_item integration helper and writing an
  audit-log entry on success.
- Log kill failures in PtySession::Drop so leaked child processes surface
  in tracing rather than being silently swallowed.
- Add explicit PTY cleanup on every exit path of run_session_io (process
  exit, read error, write error, resize error, terminate command).
- Treat PTY resize failures as fatal: emit terminal-error to the frontend
  and break the session loop instead of just warning.

WARNING fixes:
- Remove the dead extract_json_path_value helper from commands/kube.rs.
- Wrap temp kubeconfig files in commands/metrics.rs in an RAII guard
  (TempKubeconfig) so they're removed on early-return / panic paths.
- Wrap temp kubeconfig files in commands/shell.rs PTY-session starters
  in a disarmable RAII guard (KubeconfigGuard); if kubectl resolution
  fails we no longer leak the file.
- Drop the `clear;` prefix from the kubectl-exec shell fallback so
  containers without `clear`/`tput` don't print a confusing error.

SUGGESTION fixes:
- Document why node CPU/memory percentages are 0.0 in metrics::client
  and link the gap to future work fetching node capacity.
- Add a module-level doc comment to AppState describing the
  synchronization expectations (std vs tokio Mutex) for each public
  field, and warn against holding std::sync MutexGuards across .await.

Verified: cargo fmt --check, cargo clippy -- -D warnings, and
cargo test (377 passed, 6 ignored) all pass.
2026-06-09 18:08:58 -05:00

393 lines
13 KiB
Rust

// PTY Session Management
//
// This module manages the lifecycle of PTY sessions, providing:
// - Session creation and tracking
// - Bidirectional I/O streaming via Tauri events
// - Session cleanup and resource management
//
// Each session has a unique ID and runs in a background tokio task that:
// 1. Continuously reads from PTY stdout/stderr
// 2. Emits data to frontend via Tauri events
// 3. Monitors session liveness
// 4. Cleans up on exit or error
use crate::shell::pty::PtySession;
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tauri::{AppHandle, Emitter};
use tokio::sync::{mpsc, RwLock};
use tokio::time::{interval, Duration};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
/// Session metadata and control
pub struct SessionInfo {
pub id: String,
pub cluster_id: String,
pub namespace: String,
pub pod: String,
pub container: Option<String>,
pub session_type: SessionType,
pub created_at: chrono::DateTime<chrono::Utc>,
/// Channel to send stdin data to the session task
pub stdin_tx: mpsc::UnboundedSender<Vec<u8>>,
/// Channel to send control commands
pub control_tx: mpsc::UnboundedSender<ControlCommand>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionType {
Exec,
Attach,
}
#[derive(Debug)]
pub enum ControlCommand {
Resize { rows: u16, cols: u16 },
Terminate,
}
/// Parameters for starting a session
pub struct SessionParams {
pub cluster_id: String,
pub namespace: String,
pub pod: String,
pub container: Option<String>,
pub kubectl_path: String,
pub kubeconfig_path: Option<String>,
}
/// Global session registry
pub struct SessionManager {
sessions: Arc<RwLock<HashMap<String, SessionInfo>>>,
}
impl Default for SessionManager {
fn default() -> Self {
Self::new()
}
}
impl SessionManager {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Start a new kubectl exec session
pub async fn start_exec_session(
&self,
app_handle: AppHandle,
params: SessionParams,
) -> Result<String> {
let session_id = Uuid::now_v7().to_string();
// Spawn PTY session
let pty_session = PtySession::spawn_kubectl_exec(
&params.kubectl_path,
&params.namespace,
&params.pod,
params.container.as_deref(),
params.kubeconfig_path.as_deref(),
)
.context("Failed to spawn kubectl exec session")?;
self.register_session(
app_handle,
session_id.clone(),
params,
SessionType::Exec,
pty_session,
)
.await?;
Ok(session_id)
}
/// Start a new kubectl attach session
pub async fn start_attach_session(
&self,
app_handle: AppHandle,
params: SessionParams,
) -> Result<String> {
let session_id = Uuid::now_v7().to_string();
// Spawn PTY session
let pty_session = PtySession::spawn_kubectl_attach(
&params.kubectl_path,
&params.namespace,
&params.pod,
params.container.as_deref(),
params.kubeconfig_path.as_deref(),
)
.context("Failed to spawn kubectl attach session")?;
self.register_session(
app_handle,
session_id.clone(),
params,
SessionType::Attach,
pty_session,
)
.await?;
Ok(session_id)
}
/// Register and start managing a PTY session
async fn register_session(
&self,
app_handle: AppHandle,
session_id: String,
params: SessionParams,
session_type: SessionType,
pty_session: PtySession,
) -> Result<()> {
let (stdin_tx, stdin_rx) = mpsc::unbounded_channel();
let (control_tx, control_rx) = mpsc::unbounded_channel();
let info = SessionInfo {
id: session_id.clone(),
cluster_id: params.cluster_id,
namespace: params.namespace,
pod: params.pod,
container: params.container,
session_type,
created_at: chrono::Utc::now(),
stdin_tx,
control_tx,
};
// Add to registry
{
let mut sessions = self.sessions.write().await;
sessions.insert(session_id.clone(), info);
}
// Spawn session I/O task
let sessions_clone = self.sessions.clone();
let session_id_clone = session_id.clone();
tokio::spawn(async move {
if let Err(e) = Self::run_session_io(
app_handle,
session_id_clone.clone(),
pty_session,
stdin_rx,
control_rx,
)
.await
{
error!("Session {} I/O task failed: {}", session_id_clone, e);
}
// Remove from registry on exit
let mut sessions = sessions_clone.write().await;
sessions.remove(&session_id_clone);
info!("Session {} removed from registry", session_id_clone);
});
info!("Session {} started: {:?}", session_id, session_type);
Ok(())
}
/// Main I/O loop for a session
async fn run_session_io(
app_handle: AppHandle,
session_id: String,
mut pty_session: PtySession,
mut stdin_rx: mpsc::UnboundedReceiver<Vec<u8>>,
mut control_rx: mpsc::UnboundedReceiver<ControlCommand>,
) -> Result<()> {
let mut poll_interval = interval(Duration::from_millis(50));
// Explicit cleanup helper invoked on every exit path. While
// `PtySession::Drop` already best-effort kills the child, doing it here
// first lets us log the outcome and surface failures via tracing.
// After this returns, the PtySession is consumed and dropped, releasing
// the master/slave PTY handles.
let cleanup = |pty: &mut PtySession, session_id: &str, reason: &str| {
debug!(
"Cleaning up PTY for session {} (reason: {})",
session_id, reason
);
if pty.is_alive() {
if let Err(e) = pty.kill() {
warn!(
"Failed to kill PTY child for session {} during cleanup: {}",
session_id, e
);
}
}
};
loop {
tokio::select! {
// Read from PTY stdout/stderr
_ = poll_interval.tick() => {
if !pty_session.is_alive() {
debug!("Session {} PTY process exited", session_id);
let _ = app_handle.emit(&format!("terminal-closed-{}", session_id), ());
cleanup(&mut pty_session, &session_id, "process exited");
break;
}
match pty_session.read() {
Ok(data) if !data.is_empty() => {
// Emit to frontend
if let Err(e) = app_handle.emit(&format!("terminal-output-{}", session_id), data) {
warn!("Failed to emit terminal output for session {}: {}", session_id, e);
}
}
Ok(_) => {
// No data available
}
Err(e) => {
error!("Failed to read from PTY for session {}: {}", session_id, e);
let _ = app_handle.emit(&format!("terminal-error-{}", session_id), e.to_string());
cleanup(&mut pty_session, &session_id, "read error");
break;
}
}
}
// Handle stdin from frontend
Some(data) = stdin_rx.recv() => {
if let Err(e) = pty_session.write(&data) {
error!("Failed to write to PTY for session {}: {}", session_id, e);
let _ = app_handle.emit(&format!("terminal-error-{}", session_id), e.to_string());
cleanup(&mut pty_session, &session_id, "write error");
break;
}
}
// Handle control commands
Some(cmd) = control_rx.recv() => {
match cmd {
ControlCommand::Resize { rows, cols } => {
if let Err(e) = pty_session.resize(rows, cols) {
// A failed resize means the PTY is in an
// unrecoverable state (master fd closed, slave
// signal failed, etc.). Surface the error to
// the frontend and terminate the session
// rather than continuing with a stale layout.
error!(
"Failed to resize PTY for session {}: {}. Terminating session.",
session_id, e
);
let _ = app_handle.emit(
&format!("terminal-error-{}", session_id),
format!("PTY resize failed; session terminated: {e}"),
);
cleanup(&mut pty_session, &session_id, "resize error");
break;
}
}
ControlCommand::Terminate => {
info!("Session {} received terminate command", session_id);
cleanup(&mut pty_session, &session_id, "terminate command");
break;
}
}
}
}
}
Ok(())
}
/// Send stdin data to a session
pub async fn send_stdin(&self, session_id: &str, data: Vec<u8>) -> Result<()> {
let sessions = self.sessions.read().await;
let session = sessions.get(session_id).context("Session not found")?;
session
.stdin_tx
.send(data)
.context("Failed to send stdin data to session task")?;
Ok(())
}
/// Resize a session's PTY
pub async fn resize_session(&self, session_id: &str, rows: u16, cols: u16) -> Result<()> {
let sessions = self.sessions.read().await;
let session = sessions.get(session_id).context("Session not found")?;
session
.control_tx
.send(ControlCommand::Resize { rows, cols })
.context("Failed to send resize command to session task")?;
Ok(())
}
/// Terminate a session
pub async fn terminate_session(&self, session_id: &str) -> Result<()> {
let sessions = self.sessions.read().await;
let session = sessions.get(session_id).context("Session not found")?;
session
.control_tx
.send(ControlCommand::Terminate)
.context("Failed to send terminate command to session task")?;
Ok(())
}
/// List all active sessions
pub async fn list_sessions(&self) -> Vec<SessionInfo> {
let sessions = self.sessions.read().await;
sessions.values().cloned().collect()
}
/// Get session info
pub async fn get_session(&self, session_id: &str) -> Option<SessionInfo> {
let sessions = self.sessions.read().await;
sessions.get(session_id).cloned()
}
}
impl Clone for SessionInfo {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
cluster_id: self.cluster_id.clone(),
namespace: self.namespace.clone(),
pod: self.pod.clone(),
container: self.container.clone(),
session_type: self.session_type,
created_at: self.created_at,
stdin_tx: self.stdin_tx.clone(),
control_tx: self.control_tx.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_session_manager_creation() {
let manager = SessionManager::new();
let sessions = manager.list_sessions().await;
assert_eq!(sessions.len(), 0, "New manager should have no sessions");
}
#[test]
fn test_session_type_equality() {
assert_eq!(SessionType::Exec, SessionType::Exec);
assert_eq!(SessionType::Attach, SessionType::Attach);
assert_ne!(SessionType::Exec, SessionType::Attach);
}
#[test]
fn test_control_command_debug() {
let cmd = ControlCommand::Resize { rows: 24, cols: 80 };
let debug_str = format!("{:?}", cmd);
assert!(debug_str.contains("Resize"));
}
}