"""Main application entry point.""" import sys import os import subprocess import tempfile import yaml from typing import Optional, Dict import questionary from questionary import Style from rich.console import Console from rich.panel import Panel from rich.syntax import Syntax from k8s_tool.k8s_client import K8sClient from k8s_tool.config import ConfigManager console = Console() # Custom style for questionary prompts custom_style = Style([ ('qmark', 'fg:#673ab7 bold'), ('question', 'bold'), ('answer', 'fg:#f44336 bold'), ('pointer', 'fg:#673ab7 bold'), ('highlighted', 'fg:#673ab7 bold'), ('selected', 'fg:#cc5454'), ('separator', 'fg:#cc5454'), ('instruction', ''), ('text', ''), ]) class K8sTool: """Main application class.""" def __init__(self): """Initialize the application.""" self.k8s_client = K8sClient() self.config = ConfigManager() self.current_namespace: Optional[str] = None def run(self): """Run the main application loop.""" console.print(Panel.fit( "[bold cyan]K8s Tool[/bold cyan]\n" "[dim]Interactive kubectl helper[/dim]\n" "[dim]Tip: Press Ctrl+C to cancel and go back[/dim]", border_style="cyan" )) while True: try: self._main_menu() except KeyboardInterrupt: console.print("\n[yellow]Goodbye! 👋[/yellow]") sys.exit(0) except Exception as e: console.print(f"[red]Error:[/red] {e}") if questionary.confirm("Continue?", style=custom_style).ask(): continue else: break def _main_menu(self): """Display main menu.""" # Display current context current_context = self.k8s_client.current_context or "[dim]unknown[/dim]" console.print(f"[bold]Current context:[/bold] [cyan]{current_context}[/cyan]") # Display current namespace namespace_info = f"[cyan]{self.current_namespace}[/cyan]" if self.current_namespace else "[dim]not selected[/dim]" is_favorite = self.config.is_favorite(self.current_namespace) if self.current_namespace else False fav_indicator = " ⭐" if is_favorite else "" console.print(f"[bold]Current namespace:[/bold] {namespace_info}{fav_indicator}") choices = [ "Manage Contexts", "Select Namespace", "Manage Favorites", "List Deployments", "Restart Deployment", "Scale Deployment", "Edit Deployment", "View ConfigMaps", "Edit ConfigMap", "View Pod Logs", "Exit" ] action = questionary.select( "What would you like to do?", choices=choices, style=custom_style ).ask() if not action: # User pressed Esc return if action == "Manage Contexts": self._manage_contexts() elif action == "Select Namespace": self._select_namespace() elif action == "Manage Favorites": self._manage_favorites() elif action == "List Deployments": self._list_deployments() elif action == "Restart Deployment": self._restart_deployment() elif action == "Scale Deployment": self._scale_deployment() elif action == "Edit Deployment": self._edit_deployment() elif action == "View ConfigMaps": self._view_configmaps() elif action == "Edit ConfigMap": self._edit_configmaps() elif action == "View Pod Logs": self._view_pod_logs() elif action == "Exit": console.print("[yellow]Goodbye! 👋[/yellow]") sys.exit(0) def _select_namespace(self): """Select namespace.""" console.print("[dim]Fetching namespaces...[/dim]") namespaces = self.k8s_client.get_namespaces() if not namespaces: console.print("[red]No namespaces found[/red]") return # Get favorites favorites = self.config.get_favorites() # Separate favorites and non-favorites fav_namespaces = [ns for ns in namespaces if ns in favorites] other_namespaces = [ns for ns in namespaces if ns not in favorites] # Create choices with stars for favorites choices = [] if fav_namespaces: choices.extend([f"⭐ {ns}" for ns in fav_namespaces]) choices.extend(other_namespaces) # Use autocomplete for filtering selected = questionary.autocomplete( "Select namespace (type to filter):", choices=choices, style=custom_style, match_middle=True ).ask() if selected: # Remove star prefix if present namespace = selected.replace("⭐ ", "") self.current_namespace = namespace is_fav = self.config.is_favorite(namespace) fav_text = " (favorite)" if is_fav else "" console.print(f"[green]✓[/green] Namespace set to: [cyan]{namespace}[/cyan]{fav_text}") def _manage_contexts(self): """Manage Kubernetes contexts.""" choices = [ "View all contexts", "Switch context", "Add new context", "Delete context", "Back to main menu" ] action = questionary.select( "Context management:", choices=choices, style=custom_style ).ask() if not action or action == "Back to main menu": return if action == "View all contexts": self._view_contexts() elif action == "Switch context": self._switch_context() elif action == "Add new context": self._add_context() elif action == "Delete context": self._delete_context() def _view_contexts(self): """View all available contexts.""" console.print("[dim]Fetching contexts...[/dim]") contexts = self.k8s_client.get_contexts() if not contexts: console.print("[red]No contexts found[/red]") return self.k8s_client.display_contexts_table(contexts) def _switch_context(self): """Switch Kubernetes context.""" console.print("[dim]Fetching contexts...[/dim]") contexts = self.k8s_client.get_contexts() if not contexts: console.print("[red]No contexts found[/red]") return # Show contexts table self.k8s_client.display_contexts_table(contexts) # Filter out current context for selection available_contexts = [ctx['name'] for ctx in contexts if not ctx['is_active']] if not available_contexts: console.print("[yellow]Only one context available[/yellow]") return # Add option to cancel choices = available_contexts + [questionary.Separator(), "Cancel"] selected = questionary.select( "Select context to switch to:", choices=choices, style=custom_style ).ask() if selected and selected != "Cancel": if self.k8s_client.switch_context(selected): # Reset namespace selection after context switch self.current_namespace = None console.print("[yellow]Note:[/yellow] Namespace selection reset. Please select a namespace.") def _add_context(self): """Add a new Kubernetes context.""" console.print("\n[bold]Add New Context[/bold]\n") # Get context details context_name = questionary.text( "Context name:", style=custom_style ).ask() if not context_name: return cluster_name = questionary.text( "Cluster name:", default=context_name, style=custom_style ).ask() if not cluster_name: return server = questionary.text( "API server URL (e.g., https://kubernetes.example.com:6443):", style=custom_style ).ask() if not server: return user_name = questionary.text( "User name:", default=context_name + "-user", style=custom_style ).ask() if not user_name: return # Optional: token has_token = questionary.confirm( "Do you have an authentication token?", style=custom_style, default=False ).ask() if has_token is None: # User pressed Esc return token = None if has_token: token = questionary.password( "Authentication token:", style=custom_style ).ask() if token is None: # User pressed Esc return # Optional: certificate has_cert = questionary.confirm( "Do you have a certificate authority data?", style=custom_style, default=False ).ask() if has_cert is None: # User pressed Esc return cert = None if has_cert: cert = questionary.text( "Certificate authority data (base64 encoded):", style=custom_style ).ask() if not cert: # User pressed Esc return # Confirm console.print("\n[bold]Review:[/bold]") console.print(f" Context: [cyan]{context_name}[/cyan]") console.print(f" Cluster: [cyan]{cluster_name}[/cyan]") console.print(f" Server: [cyan]{server}[/cyan]") console.print(f" User: [cyan]{user_name}[/cyan]") console.print(f" Token: [cyan]{'***' if token else 'None'}[/cyan]") console.print(f" Certificate: [cyan]{'***' if cert else 'None (insecure)'}[/cyan]") confirm = questionary.confirm("\nAdd this context?", style=custom_style, default=False).ask() if confirm: self.k8s_client.add_context(context_name, cluster_name, user_name, server, cert, token) def _delete_context(self): """Delete a Kubernetes context.""" console.print("[dim]Fetching contexts...[/dim]") contexts = self.k8s_client.get_contexts() if not contexts: console.print("[red]No contexts found[/red]") return # Filter out current context deletable_contexts = [ctx['name'] for ctx in contexts if not ctx['is_active']] if not deletable_contexts: console.print("[yellow]No contexts available for deletion (cannot delete current context)[/yellow]") return context_name = questionary.select( "Select context to delete:", choices=deletable_contexts + [questionary.Separator(), "Cancel"], style=custom_style ).ask() if not context_name or context_name == "Cancel": return confirm = questionary.confirm( f"Are you sure you want to delete context '{context_name}'?", style=custom_style, default=False ).ask() if confirm: self.k8s_client.delete_context(context_name) def _manage_favorites(self): """Manage favorite namespaces.""" choices = [] if self.current_namespace: if self.config.is_favorite(self.current_namespace): choices.append(f"Remove '{self.current_namespace}' from favorites") else: choices.append(f"Add '{self.current_namespace}' to favorites") choices.extend([ "View all favorites", "Add namespace to favorites", "Remove namespace from favorites", "Clear all favorites", "Back to main menu" ]) action = questionary.select( "Favorites management:", choices=choices, style=custom_style ).ask() if not action or action == "Back to main menu": return if action.startswith("Add '") and action.endswith("' to favorites"): # Add current namespace self.config.add_favorite(self.current_namespace) elif action.startswith("Remove '") and action.endswith("' from favorites"): # Remove current namespace self.config.remove_favorite(self.current_namespace) elif action == "View all favorites": self._view_favorites() elif action == "Add namespace to favorites": self._add_namespace_to_favorites() elif action == "Remove namespace from favorites": self._remove_namespace_from_favorites() elif action == "Clear all favorites": confirm = questionary.confirm("Are you sure you want to clear all favorites?", style=custom_style, default=False).ask() if confirm: self.config.clear_favorites() def _view_favorites(self): """View all favorite namespaces.""" favorites = self.config.get_favorites() if not favorites: console.print("[yellow]No favorites yet[/yellow]") return console.print("\n[bold]Favorite namespaces:[/bold]") for ns in favorites: indicator = " [dim](current)[/dim]" if ns == self.current_namespace else "" console.print(f" ⭐ [cyan]{ns}[/cyan]{indicator}") console.print() def _add_namespace_to_favorites(self): """Add a namespace to favorites.""" console.print("[dim]Fetching namespaces...[/dim]") namespaces = self.k8s_client.get_namespaces() if not namespaces: console.print("[red]No namespaces found[/red]") return # Filter out already favorite namespaces favorites = self.config.get_favorites() available = [ns for ns in namespaces if ns not in favorites] if not available: console.print("[yellow]All namespaces are already in favorites[/yellow]") return namespace = questionary.select( "Select namespace to add to favorites:", choices=available, style=custom_style ).ask() if namespace: self.config.add_favorite(namespace) def _remove_namespace_from_favorites(self): """Remove a namespace from favorites.""" favorites = self.config.get_favorites() if not favorites: console.print("[yellow]No favorites to remove[/yellow]") return namespace = questionary.select( "Select namespace to remove from favorites:", choices=favorites, style=custom_style ).ask() if namespace: self.config.remove_favorite(namespace) def _ensure_namespace_selected(self) -> bool: """Ensure namespace is selected.""" if not self.current_namespace: console.print("[yellow]Please select a namespace first[/yellow]") self._select_namespace() return self.current_namespace is not None return True def _list_deployments(self): """List deployments in current namespace.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]") deployments = self.k8s_client.get_deployments(self.current_namespace) if not deployments: console.print("[yellow]No deployments found[/yellow]") return self.k8s_client.display_deployments_table(deployments) view_pods = questionary.confirm("View pods for a deployment?", style=custom_style, default=False).ask() if view_pods: dep_names = [dep['name'] for dep in deployments] dep_name = questionary.select( "Select deployment:", choices=dep_names, style=custom_style ).ask() if dep_name: self._view_deployment_pods(dep_name) def _view_deployment_pods(self, deployment_name: str): """View pods for a deployment.""" label_selector = f"app={deployment_name}" console.print(f"[dim]Fetching pods for deployment {deployment_name}...[/dim]") pods = self.k8s_client.get_pods(self.current_namespace, label_selector=label_selector) if not pods: # Try without label selector console.print("[yellow]No pods found with label selector, fetching all pods...[/yellow]") pods = self.k8s_client.get_pods(self.current_namespace) if pods: self.k8s_client.display_pods_table(pods) else: console.print("[yellow]No pods found[/yellow]") def _restart_deployment(self): """Restart a deployment.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]") deployments = self.k8s_client.get_deployments(self.current_namespace) if not deployments: console.print("[yellow]No deployments found[/yellow]") return dep_names = [dep['name'] for dep in deployments] dep_name = questionary.select( "Select deployment to restart:", choices=dep_names, style=custom_style ).ask() if not dep_name: return confirm = questionary.confirm( f"Are you sure you want to restart {dep_name}?", style=custom_style, default=False ).ask() if confirm: self.k8s_client.restart_deployment(self.current_namespace, dep_name) def _scale_deployment(self): """Scale a deployment.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]") deployments = self.k8s_client.get_deployments(self.current_namespace) if not deployments: console.print("[yellow]No deployments found[/yellow]") return dep_names = [f"{dep['name']} (current: {dep['replicas']})" for dep in deployments] selected = questionary.select( "Select deployment to scale:", choices=dep_names, style=custom_style ).ask() if not selected: return dep_name = selected.split(" (current:")[0] replicas = questionary.text( "Enter number of replicas:", validate=lambda text: text.isdigit() and int(text) >= 0, style=custom_style ).ask() if replicas: self.k8s_client.scale_deployment(self.current_namespace, dep_name, int(replicas)) def _edit_deployment(self): """Edit a deployment.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]") deployments = self.k8s_client.get_deployments(self.current_namespace) if not deployments: console.print("[yellow]No deployments found[/yellow]") return dep_names = [dep['name'] for dep in deployments] dep_name = questionary.select( "Select deployment to edit:", choices=dep_names, style=custom_style ).ask() if not dep_name: return # Get deployment YAML deployment_yaml = self.k8s_client.get_deployment_yaml(self.current_namespace, dep_name) if deployment_yaml: self._edit_deployment_yaml(dep_name, deployment_yaml) def _edit_deployment_yaml(self, dep_name: str, current_yaml: str): """Edit deployment YAML in text editor.""" # Get editor from environment or use default editor = os.environ.get('EDITOR', os.environ.get('VISUAL', 'vi')) # Create temporary file with deployment YAML with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tf: temp_file = tf.name tf.write(current_yaml) try: # Get modification time before editing mtime_before = os.path.getmtime(temp_file) # Open editor subprocess.call([editor, temp_file]) # Check if file was modified mtime_after = os.path.getmtime(temp_file) if mtime_after == mtime_before: console.print("[yellow]No changes made[/yellow]") return # Read edited data with open(temp_file, 'r') as f: try: new_yaml = f.read() # Validate YAML yaml.safe_load(new_yaml) except yaml.YAMLError as e: console.print(f"[red]Error parsing YAML:[/red] {e}") if questionary.confirm("Retry editing?", style=custom_style, default=False).ask(): self._edit_deployment_yaml(dep_name, current_yaml) return # Show confirmation console.print("\n[bold]Deployment YAML has been modified[/bold]") console.print("[yellow]Warning:[/yellow] Editing deployment directly can be risky.") # Confirm update if questionary.confirm( f"Apply changes to deployment '{dep_name}'?", style=custom_style, default=False ).ask(): self.k8s_client.update_deployment_yaml(self.current_namespace, dep_name, new_yaml) finally: # Clean up temp file try: os.unlink(temp_file) except Exception: pass def _view_configmaps(self): """View ConfigMaps.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching ConfigMaps in {self.current_namespace}...[/dim]") configmaps = self.k8s_client.get_configmaps(self.current_namespace) if not configmaps: console.print("[yellow]No ConfigMaps found[/yellow]") return self.k8s_client.display_configmaps_table(configmaps) view_data = questionary.confirm("View ConfigMap data?", style=custom_style, default=False).ask() if view_data: cm_names = [cm['name'] for cm in configmaps] cm_name = questionary.select( "Select ConfigMap:", choices=cm_names, style=custom_style ).ask() if cm_name: data = self.k8s_client.get_configmap_data(self.current_namespace, cm_name) if data: console.print(f"\n[bold]ConfigMap:[/bold] [cyan]{cm_name}[/cyan]\n") for key, value in data.items(): console.print(f"[bold cyan]--- {key} ---[/bold cyan]") console.print(Syntax(value, "yaml", line_numbers=False, background_color="default")) console.print() def _edit_configmaps(self): """Edit a ConfigMap.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching ConfigMaps in {self.current_namespace}...[/dim]") configmaps = self.k8s_client.get_configmaps(self.current_namespace) if not configmaps: console.print("[yellow]No ConfigMaps found[/yellow]") return cm_names = [cm['name'] for cm in configmaps] cm_name = questionary.select( "Select ConfigMap to edit:", choices=cm_names, style=custom_style ).ask() if cm_name: data = self.k8s_client.get_configmap_data(self.current_namespace, cm_name) if data: self._edit_configmap(cm_name, data) def _edit_configmap(self, cm_name: str, current_data: Dict[str, str]): """Edit ConfigMap data in text editor.""" # Get editor from environment or use default editor = os.environ.get('EDITOR', os.environ.get('VISUAL', 'vi')) # Create temporary file with ConfigMap data with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tf: temp_file = tf.name # Write current data as YAML yaml.dump(current_data, tf, default_flow_style=False, allow_unicode=True) try: # Get modification time before editing mtime_before = os.path.getmtime(temp_file) # Open editor subprocess.call([editor, temp_file]) # Check if file was modified mtime_after = os.path.getmtime(temp_file) if mtime_after == mtime_before: console.print("[yellow]No changes made[/yellow]") return # Read edited data with open(temp_file, 'r') as f: try: new_data = yaml.safe_load(f) except yaml.YAMLError as e: console.print(f"[red]Error parsing YAML:[/red] {e}") if questionary.confirm("Retry editing?", style=custom_style, default=False).ask(): self._edit_configmap(cm_name, current_data) return # Validate that new_data is a dict if not isinstance(new_data, dict): console.print("[red]Error:[/red] ConfigMap data must be a dictionary") if questionary.confirm("Retry editing?", style=custom_style, default=False).ask(): self._edit_configmap(cm_name, current_data) return # Convert all values to strings (ConfigMap requirement) new_data = {k: str(v) if not isinstance(v, str) else v for k, v in new_data.items()} # Show diff console.print("\n[bold]Changes:[/bold]") removed_keys = set(current_data.keys()) - set(new_data.keys()) added_keys = set(new_data.keys()) - set(current_data.keys()) modified_keys = {k for k in current_data.keys() & new_data.keys() if current_data[k] != new_data[k]} if removed_keys: console.print(f"[red]Removed keys:[/red] {', '.join(removed_keys)}") if added_keys: console.print(f"[green]Added keys:[/green] {', '.join(added_keys)}") if modified_keys: console.print(f"[yellow]Modified keys:[/yellow] {', '.join(modified_keys)}") if not (removed_keys or added_keys or modified_keys): console.print("[yellow]No changes detected[/yellow]") return # Confirm update if questionary.confirm( f"Apply changes to ConfigMap '{cm_name}'?", style=custom_style, default=False ).ask(): self.k8s_client.update_configmap(self.current_namespace, cm_name, new_data) finally: # Clean up temp file try: os.unlink(temp_file) except Exception: pass def _view_pod_logs(self): """View pod logs.""" if not self._ensure_namespace_selected(): return console.print(f"[dim]Fetching pods in {self.current_namespace}...[/dim]") pods = self.k8s_client.get_pods(self.current_namespace) if not pods: console.print("[yellow]No pods found[/yellow]") return pod_names = [f"{pod['name']} (Status: {pod['status']}, Age: {pod['age']})" for pod in pods] selected = questionary.select( "Select pod:", choices=pod_names, style=custom_style ).ask() if not selected: return pod_name = selected.split(" (Status:")[0] # Check if pod has multiple containers containers = self.k8s_client.get_pod_containers(self.current_namespace, pod_name) container = None if len(containers) > 1: container = questionary.select( "Select container:", choices=containers, style=custom_style ).ask() if not container: # User pressed Esc return elif len(containers) == 1: container = containers[0] tail_lines = questionary.text( "Number of lines to show (default: 100):", default="100", validate=lambda text: text.isdigit() and int(text) > 0, style=custom_style ).ask() if tail_lines: logs = self.k8s_client.get_pod_logs( self.current_namespace, pod_name, container=container, tail_lines=int(tail_lines) ) if logs: title = f"[bold cyan]Logs: {pod_name}" if container: title += f" [{container}]" title += "[/bold cyan]" console.print(f"\n{title}\n") console.print(Syntax(logs, "log", line_numbers=False, background_color="default")) console.print() def main(): """Main entry point.""" try: app = K8sTool() app.run() except KeyboardInterrupt: console.print("\n[yellow]Goodbye! 👋[/yellow]") sys.exit(0) except Exception as e: console.print(f"[red]Fatal error:[/red] {e}") sys.exit(1) if __name__ == "__main__": main()