"""Kubernetes API client wrapper.""" from typing import List, Dict, Any, Optional from kubernetes import client, config from kubernetes.client.exceptions import ApiException from rich.console import Console from rich.table import Table from datetime import datetime console = Console() class K8sClient: """Wrapper for Kubernetes API operations.""" def __init__(self): """Initialize Kubernetes client.""" try: config.load_kube_config() self.v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() self.current_context = self.get_current_context() console.print(f"[green]✓[/green] Connected to Kubernetes cluster") if self.current_context: console.print(f"[dim]Current context: {self.current_context}[/dim]") except Exception as e: console.print(f"[red]✗[/red] Failed to connect to Kubernetes: {e}") raise def get_contexts(self) -> List[Dict[str, str]]: """Get list of available contexts.""" try: contexts, active_context = config.list_kube_config_contexts() return [{ 'name': ctx['name'], 'cluster': ctx['context'].get('cluster', ''), 'user': ctx['context'].get('user', ''), 'is_active': ctx['name'] == active_context['name'] } for ctx in contexts] except Exception as e: console.print(f"[red]Error fetching contexts:[/red] {e}") return [] def get_current_context(self) -> Optional[str]: """Get current context name.""" try: contexts, active_context = config.list_kube_config_contexts() return active_context['name'] if active_context else None except Exception as e: console.print(f"[red]Error getting current context:[/red] {e}") return None def switch_context(self, context_name: str) -> bool: """Switch to a different context.""" try: # Load the new context config.load_kube_config(context=context_name) # Reinitialize API clients self.v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() self.current_context = context_name console.print(f"[green]✓[/green] Switched to context: [cyan]{context_name}[/cyan]") return True except Exception as e: console.print(f"[red]Error switching context:[/red] {e}") return False def get_namespaces(self) -> List[str]: """Get list of all namespaces.""" try: namespaces = self.v1.list_namespace() return sorted([ns.metadata.name for ns in namespaces.items]) except ApiException as e: console.print(f"[red]Error fetching namespaces:[/red] {e}") return [] def get_deployments(self, namespace: str) -> List[Dict[str, Any]]: """Get list of deployments in namespace.""" try: deployments = self.apps_v1.list_namespaced_deployment(namespace) return [{ 'name': dep.metadata.name, 'replicas': dep.spec.replicas, 'available': dep.status.available_replicas or 0, 'ready': dep.status.ready_replicas or 0, } for dep in deployments.items] except ApiException as e: console.print(f"[red]Error fetching deployments:[/red] {e}") return [] def get_pods(self, namespace: str, label_selector: Optional[str] = None) -> List[Dict[str, Any]]: """Get list of pods in namespace.""" try: if label_selector: pods = self.v1.list_namespaced_pod(namespace, label_selector=label_selector) else: pods = self.v1.list_namespaced_pod(namespace) return [{ 'name': pod.metadata.name, 'status': pod.status.phase, 'ready': sum(1 for c in pod.status.container_statuses or [] if c.ready), 'total_containers': len(pod.spec.containers), 'restarts': sum(c.restart_count for c in pod.status.container_statuses or []), 'age': self._calculate_age(pod.metadata.creation_timestamp), } for pod in pods.items] except ApiException as e: console.print(f"[red]Error fetching pods:[/red] {e}") return [] def get_configmaps(self, namespace: str) -> List[Dict[str, Any]]: """Get list of ConfigMaps in namespace.""" try: configmaps = self.v1.list_namespaced_config_map(namespace) return [{ 'name': cm.metadata.name, 'data_keys': list(cm.data.keys()) if cm.data else [], 'age': self._calculate_age(cm.metadata.creation_timestamp), } for cm in configmaps.items] except ApiException as e: console.print(f"[red]Error fetching ConfigMaps:[/red] {e}") return [] def get_configmap_data(self, namespace: str, name: str) -> Optional[Dict[str, str]]: """Get ConfigMap data.""" try: cm = self.v1.read_namespaced_config_map(name, namespace) return cm.data except ApiException as e: console.print(f"[red]Error reading ConfigMap:[/red] {e}") return None def update_configmap(self, namespace: str, name: str, data: Dict[str, str]) -> bool: """Update ConfigMap data.""" try: # Read the existing ConfigMap first cm = self.v1.read_namespaced_config_map(name, namespace) # Update only the data field cm.data = data # Patch the ConfigMap self.v1.patch_namespaced_config_map(name, namespace, cm) console.print(f"[green]✓[/green] ConfigMap {name} updated successfully") return True except ApiException as e: console.print(f"[red]Error updating ConfigMap:[/red] {e}") return False def restart_deployment(self, namespace: str, name: str) -> bool: """Restart deployment by updating annotation.""" try: now = datetime.utcnow().isoformat() + "Z" body = { 'spec': { 'template': { 'metadata': { 'annotations': { 'kubectl.kubernetes.io/restartedAt': now } } } } } self.apps_v1.patch_namespaced_deployment(name, namespace, body) console.print(f"[green]✓[/green] Deployment {name} restarted") return True except ApiException as e: console.print(f"[red]Error restarting deployment:[/red] {e}") return False def scale_deployment(self, namespace: str, name: str, replicas: int) -> bool: """Scale deployment to specified number of replicas.""" try: body = {'spec': {'replicas': replicas}} self.apps_v1.patch_namespaced_deployment_scale(name, namespace, body) console.print(f"[green]✓[/green] Deployment {name} scaled to {replicas} replicas") return True except ApiException as e: console.print(f"[red]Error scaling deployment:[/red] {e}") return False def get_pod_logs(self, namespace: str, pod_name: str, container: Optional[str] = None, tail_lines: int = 100) -> Optional[str]: """Get logs from pod.""" try: kwargs = {'name': pod_name, 'namespace': namespace, 'tail_lines': tail_lines} if container: kwargs['container'] = container logs = self.v1.read_namespaced_pod_log(**kwargs) return logs except ApiException as e: console.print(f"[red]Error fetching logs:[/red] {e}") return None def get_pod_containers(self, namespace: str, pod_name: str) -> List[str]: """Get list of containers in pod.""" try: pod = self.v1.read_namespaced_pod(pod_name, namespace) return [container.name for container in pod.spec.containers] except ApiException as e: console.print(f"[red]Error fetching pod info:[/red] {e}") return [] @staticmethod def _calculate_age(timestamp: datetime) -> str: """Calculate age from timestamp.""" if not timestamp: return "Unknown" now = datetime.now(timestamp.tzinfo) delta = now - timestamp if delta.days > 0: return f"{delta.days}d" elif delta.seconds >= 3600: return f"{delta.seconds // 3600}h" elif delta.seconds >= 60: return f"{delta.seconds // 60}m" else: return f"{delta.seconds}s" def display_deployments_table(self, deployments: List[Dict[str, Any]]): """Display deployments in a table.""" table = Table(title="Deployments") table.add_column("Name", style="cyan") table.add_column("Replicas", style="magenta") table.add_column("Available", style="green") table.add_column("Ready", style="yellow") for dep in deployments: table.add_row( dep['name'], str(dep['replicas']), str(dep['available']), str(dep['ready']) ) console.print(table) def display_pods_table(self, pods: List[Dict[str, Any]]): """Display pods in a table.""" table = Table(title="Pods") table.add_column("Name", style="cyan") table.add_column("Status", style="magenta") table.add_column("Ready", style="green") table.add_column("Restarts", style="yellow") table.add_column("Age", style="blue") for pod in pods: status_color = "green" if pod['status'] == "Running" else "red" table.add_row( pod['name'], f"[{status_color}]{pod['status']}[/{status_color}]", f"{pod['ready']}/{pod['total_containers']}", str(pod['restarts']), pod['age'] ) console.print(table) def display_configmaps_table(self, configmaps: List[Dict[str, Any]]): """Display ConfigMaps in a table.""" table = Table(title="ConfigMaps") table.add_column("Name", style="cyan") table.add_column("Keys", style="magenta") table.add_column("Age", style="blue") for cm in configmaps: table.add_row( cm['name'], ", ".join(cm['data_keys']) if cm['data_keys'] else "None", cm['age'] ) console.print(table) def display_contexts_table(self, contexts: List[Dict[str, str]]): """Display contexts in a table.""" table = Table(title="Kubernetes Contexts") table.add_column("Name", style="cyan") table.add_column("Cluster", style="magenta") table.add_column("User", style="yellow") table.add_column("Active", style="green") for ctx in contexts: active_mark = "✓" if ctx['is_active'] else "" table.add_row( ctx['name'], ctx['cluster'], ctx['user'], active_mark ) console.print(table)