k8s-tool/k8s_tool/main.py
2025-10-27 02:02:40 +03:00

885 lines
29 KiB
Python

"""Main application entry point."""
import sys
import os
import subprocess
import tempfile
import yaml
from typing import Optional, Dict
import questionary
from questionary import Style
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from k8s_tool.k8s_client import K8sClient
from k8s_tool.config import ConfigManager
console = Console()
# Custom style for questionary prompts
custom_style = Style([
('qmark', 'fg:#673ab7 bold'),
('question', 'bold'),
('answer', 'fg:#f44336 bold'),
('pointer', 'fg:#673ab7 bold'),
('highlighted', 'fg:#673ab7 bold'),
('selected', 'fg:#cc5454'),
('separator', 'fg:#cc5454'),
('instruction', ''),
('text', ''),
])
class K8sTool:
"""Main application class."""
def __init__(self):
"""Initialize the application."""
self.k8s_client = K8sClient()
self.config = ConfigManager()
self.current_namespace: Optional[str] = None
def run(self):
"""Run the main application loop."""
console.print(Panel.fit(
"[bold cyan]K8s Tool[/bold cyan]\n"
"[dim]Interactive kubectl helper[/dim]\n"
"[dim]Tip: Press Ctrl+C to cancel and go back[/dim]",
border_style="cyan"
))
while True:
try:
self._main_menu()
except KeyboardInterrupt:
console.print("\n[yellow]Goodbye! 👋[/yellow]")
sys.exit(0)
except Exception as e:
console.print(f"[red]Error:[/red] {e}")
if questionary.confirm("Continue?", style=custom_style).ask():
continue
else:
break
def _main_menu(self):
"""Display main menu."""
# Display current context
current_context = self.k8s_client.current_context or "[dim]unknown[/dim]"
console.print(f"[bold]Current context:[/bold] [cyan]{current_context}[/cyan]")
# Display current namespace
namespace_info = f"[cyan]{self.current_namespace}[/cyan]" if self.current_namespace else "[dim]not selected[/dim]"
is_favorite = self.config.is_favorite(self.current_namespace) if self.current_namespace else False
fav_indicator = "" if is_favorite else ""
console.print(f"[bold]Current namespace:[/bold] {namespace_info}{fav_indicator}")
choices = [
"Manage Contexts",
"Select Namespace",
"Manage Favorites",
"List Deployments",
"Restart Deployment",
"Scale Deployment",
"Edit Deployment",
"View ConfigMaps",
"Edit ConfigMap",
"View Pod Logs",
"Exit"
]
action = questionary.select(
"What would you like to do?",
choices=choices,
style=custom_style
).ask()
if not action:
# User pressed Esc
return
if action == "Manage Contexts":
self._manage_contexts()
elif action == "Select Namespace":
self._select_namespace()
elif action == "Manage Favorites":
self._manage_favorites()
elif action == "List Deployments":
self._list_deployments()
elif action == "Restart Deployment":
self._restart_deployment()
elif action == "Scale Deployment":
self._scale_deployment()
elif action == "Edit Deployment":
self._edit_deployment()
elif action == "View ConfigMaps":
self._view_configmaps()
elif action == "Edit ConfigMap":
self._edit_configmaps()
elif action == "View Pod Logs":
self._view_pod_logs()
elif action == "Exit":
console.print("[yellow]Goodbye! 👋[/yellow]")
sys.exit(0)
def _select_namespace(self):
"""Select namespace."""
console.print("[dim]Fetching namespaces...[/dim]")
namespaces = self.k8s_client.get_namespaces()
if not namespaces:
console.print("[red]No namespaces found[/red]")
return
# Get favorites
favorites = self.config.get_favorites()
# Separate favorites and non-favorites
fav_namespaces = [ns for ns in namespaces if ns in favorites]
other_namespaces = [ns for ns in namespaces if ns not in favorites]
# Create choices with stars for favorites
choices = []
if fav_namespaces:
choices.extend([f"{ns}" for ns in fav_namespaces])
choices.extend(other_namespaces)
# Use autocomplete for filtering
selected = questionary.autocomplete(
"Select namespace (type to filter):",
choices=choices,
style=custom_style,
match_middle=True
).ask()
if selected:
# Remove star prefix if present
namespace = selected.replace("", "")
self.current_namespace = namespace
is_fav = self.config.is_favorite(namespace)
fav_text = " (favorite)" if is_fav else ""
console.print(f"[green]✓[/green] Namespace set to: [cyan]{namespace}[/cyan]{fav_text}")
def _manage_contexts(self):
"""Manage Kubernetes contexts."""
choices = [
"View all contexts",
"Switch context",
"Add new context",
"Delete context",
"Back to main menu"
]
action = questionary.select(
"Context management:",
choices=choices,
style=custom_style
).ask()
if not action or action == "Back to main menu":
return
if action == "View all contexts":
self._view_contexts()
elif action == "Switch context":
self._switch_context()
elif action == "Add new context":
self._add_context()
elif action == "Delete context":
self._delete_context()
def _view_contexts(self):
"""View all available contexts."""
console.print("[dim]Fetching contexts...[/dim]")
contexts = self.k8s_client.get_contexts()
if not contexts:
console.print("[red]No contexts found[/red]")
return
self.k8s_client.display_contexts_table(contexts)
def _switch_context(self):
"""Switch Kubernetes context."""
console.print("[dim]Fetching contexts...[/dim]")
contexts = self.k8s_client.get_contexts()
if not contexts:
console.print("[red]No contexts found[/red]")
return
# Show contexts table
self.k8s_client.display_contexts_table(contexts)
# Filter out current context for selection
available_contexts = [ctx['name'] for ctx in contexts if not ctx['is_active']]
if not available_contexts:
console.print("[yellow]Only one context available[/yellow]")
return
# Add option to cancel
choices = available_contexts + [questionary.Separator(), "Cancel"]
selected = questionary.select(
"Select context to switch to:",
choices=choices,
style=custom_style
).ask()
if selected and selected != "Cancel":
if self.k8s_client.switch_context(selected):
# Reset namespace selection after context switch
self.current_namespace = None
console.print("[yellow]Note:[/yellow] Namespace selection reset. Please select a namespace.")
def _add_context(self):
"""Add a new Kubernetes context."""
console.print("\n[bold]Add New Context[/bold]\n")
# Get context details
context_name = questionary.text(
"Context name:",
style=custom_style
).ask()
if not context_name:
return
cluster_name = questionary.text(
"Cluster name:",
default=context_name,
style=custom_style
).ask()
if not cluster_name:
return
server = questionary.text(
"API server URL (e.g., https://kubernetes.example.com:6443):",
style=custom_style
).ask()
if not server:
return
user_name = questionary.text(
"User name:",
default=context_name + "-user",
style=custom_style
).ask()
if not user_name:
return
# Optional: token
has_token = questionary.confirm(
"Do you have an authentication token?",
style=custom_style,
default=False
).ask()
if has_token is None:
# User pressed Esc
return
token = None
if has_token:
token = questionary.password(
"Authentication token:",
style=custom_style
).ask()
if token is None:
# User pressed Esc
return
# Optional: certificate
has_cert = questionary.confirm(
"Do you have a certificate authority data?",
style=custom_style,
default=False
).ask()
if has_cert is None:
# User pressed Esc
return
cert = None
if has_cert:
cert = questionary.text(
"Certificate authority data (base64 encoded):",
style=custom_style
).ask()
if not cert:
# User pressed Esc
return
# Confirm
console.print("\n[bold]Review:[/bold]")
console.print(f" Context: [cyan]{context_name}[/cyan]")
console.print(f" Cluster: [cyan]{cluster_name}[/cyan]")
console.print(f" Server: [cyan]{server}[/cyan]")
console.print(f" User: [cyan]{user_name}[/cyan]")
console.print(f" Token: [cyan]{'***' if token else 'None'}[/cyan]")
console.print(f" Certificate: [cyan]{'***' if cert else 'None (insecure)'}[/cyan]")
confirm = questionary.confirm("\nAdd this context?", style=custom_style, default=False).ask()
if confirm:
self.k8s_client.add_context(context_name, cluster_name, user_name, server, cert, token)
def _delete_context(self):
"""Delete a Kubernetes context."""
console.print("[dim]Fetching contexts...[/dim]")
contexts = self.k8s_client.get_contexts()
if not contexts:
console.print("[red]No contexts found[/red]")
return
# Filter out current context
deletable_contexts = [ctx['name'] for ctx in contexts if not ctx['is_active']]
if not deletable_contexts:
console.print("[yellow]No contexts available for deletion (cannot delete current context)[/yellow]")
return
context_name = questionary.select(
"Select context to delete:",
choices=deletable_contexts + [questionary.Separator(), "Cancel"],
style=custom_style
).ask()
if not context_name or context_name == "Cancel":
return
confirm = questionary.confirm(
f"Are you sure you want to delete context '{context_name}'?",
style=custom_style,
default=False
).ask()
if confirm:
self.k8s_client.delete_context(context_name)
def _manage_favorites(self):
"""Manage favorite namespaces."""
choices = []
if self.current_namespace:
if self.config.is_favorite(self.current_namespace):
choices.append(f"Remove '{self.current_namespace}' from favorites")
else:
choices.append(f"Add '{self.current_namespace}' to favorites")
choices.extend([
"View all favorites",
"Add namespace to favorites",
"Remove namespace from favorites",
"Clear all favorites",
"Back to main menu"
])
action = questionary.select(
"Favorites management:",
choices=choices,
style=custom_style
).ask()
if not action or action == "Back to main menu":
return
if action.startswith("Add '") and action.endswith("' to favorites"):
# Add current namespace
self.config.add_favorite(self.current_namespace)
elif action.startswith("Remove '") and action.endswith("' from favorites"):
# Remove current namespace
self.config.remove_favorite(self.current_namespace)
elif action == "View all favorites":
self._view_favorites()
elif action == "Add namespace to favorites":
self._add_namespace_to_favorites()
elif action == "Remove namespace from favorites":
self._remove_namespace_from_favorites()
elif action == "Clear all favorites":
confirm = questionary.confirm("Are you sure you want to clear all favorites?", style=custom_style, default=False).ask()
if confirm:
self.config.clear_favorites()
def _view_favorites(self):
"""View all favorite namespaces."""
favorites = self.config.get_favorites()
if not favorites:
console.print("[yellow]No favorites yet[/yellow]")
return
console.print("\n[bold]Favorite namespaces:[/bold]")
for ns in favorites:
indicator = " [dim](current)[/dim]" if ns == self.current_namespace else ""
console.print(f" ⭐ [cyan]{ns}[/cyan]{indicator}")
console.print()
def _add_namespace_to_favorites(self):
"""Add a namespace to favorites."""
console.print("[dim]Fetching namespaces...[/dim]")
namespaces = self.k8s_client.get_namespaces()
if not namespaces:
console.print("[red]No namespaces found[/red]")
return
# Filter out already favorite namespaces
favorites = self.config.get_favorites()
available = [ns for ns in namespaces if ns not in favorites]
if not available:
console.print("[yellow]All namespaces are already in favorites[/yellow]")
return
namespace = questionary.select(
"Select namespace to add to favorites:",
choices=available,
style=custom_style
).ask()
if namespace:
self.config.add_favorite(namespace)
def _remove_namespace_from_favorites(self):
"""Remove a namespace from favorites."""
favorites = self.config.get_favorites()
if not favorites:
console.print("[yellow]No favorites to remove[/yellow]")
return
namespace = questionary.select(
"Select namespace to remove from favorites:",
choices=favorites,
style=custom_style
).ask()
if namespace:
self.config.remove_favorite(namespace)
def _ensure_namespace_selected(self) -> bool:
"""Ensure namespace is selected."""
if not self.current_namespace:
console.print("[yellow]Please select a namespace first[/yellow]")
self._select_namespace()
return self.current_namespace is not None
return True
def _list_deployments(self):
"""List deployments in current namespace."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]")
deployments = self.k8s_client.get_deployments(self.current_namespace)
if not deployments:
console.print("[yellow]No deployments found[/yellow]")
return
self.k8s_client.display_deployments_table(deployments)
view_pods = questionary.confirm("View pods for a deployment?", style=custom_style, default=False).ask()
if view_pods:
dep_names = [dep['name'] for dep in deployments]
dep_name = questionary.select(
"Select deployment:",
choices=dep_names,
style=custom_style
).ask()
if dep_name:
self._view_deployment_pods(dep_name)
def _view_deployment_pods(self, deployment_name: str):
"""View pods for a deployment."""
label_selector = f"app={deployment_name}"
console.print(f"[dim]Fetching pods for deployment {deployment_name}...[/dim]")
pods = self.k8s_client.get_pods(self.current_namespace, label_selector=label_selector)
if not pods:
# Try without label selector
console.print("[yellow]No pods found with label selector, fetching all pods...[/yellow]")
pods = self.k8s_client.get_pods(self.current_namespace)
if pods:
self.k8s_client.display_pods_table(pods)
else:
console.print("[yellow]No pods found[/yellow]")
def _restart_deployment(self):
"""Restart a deployment."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]")
deployments = self.k8s_client.get_deployments(self.current_namespace)
if not deployments:
console.print("[yellow]No deployments found[/yellow]")
return
dep_names = [dep['name'] for dep in deployments]
dep_name = questionary.select(
"Select deployment to restart:",
choices=dep_names,
style=custom_style
).ask()
if not dep_name:
return
confirm = questionary.confirm(
f"Are you sure you want to restart {dep_name}?",
style=custom_style,
default=False
).ask()
if confirm:
self.k8s_client.restart_deployment(self.current_namespace, dep_name)
def _scale_deployment(self):
"""Scale a deployment."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]")
deployments = self.k8s_client.get_deployments(self.current_namespace)
if not deployments:
console.print("[yellow]No deployments found[/yellow]")
return
dep_names = [f"{dep['name']} (current: {dep['replicas']})" for dep in deployments]
selected = questionary.select(
"Select deployment to scale:",
choices=dep_names,
style=custom_style
).ask()
if not selected:
return
dep_name = selected.split(" (current:")[0]
replicas = questionary.text(
"Enter number of replicas:",
validate=lambda text: text.isdigit() and int(text) >= 0,
style=custom_style
).ask()
if replicas:
self.k8s_client.scale_deployment(self.current_namespace, dep_name, int(replicas))
def _edit_deployment(self):
"""Edit a deployment."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching deployments in {self.current_namespace}...[/dim]")
deployments = self.k8s_client.get_deployments(self.current_namespace)
if not deployments:
console.print("[yellow]No deployments found[/yellow]")
return
dep_names = [dep['name'] for dep in deployments]
dep_name = questionary.select(
"Select deployment to edit:",
choices=dep_names,
style=custom_style
).ask()
if not dep_name:
return
# Get deployment YAML
deployment_yaml = self.k8s_client.get_deployment_yaml(self.current_namespace, dep_name)
if deployment_yaml:
self._edit_deployment_yaml(dep_name, deployment_yaml)
def _edit_deployment_yaml(self, dep_name: str, current_yaml: str):
"""Edit deployment YAML in text editor."""
# Get editor from environment or use default
editor = os.environ.get('EDITOR', os.environ.get('VISUAL', 'vi'))
# Create temporary file with deployment YAML
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tf:
temp_file = tf.name
tf.write(current_yaml)
try:
# Get modification time before editing
mtime_before = os.path.getmtime(temp_file)
# Open editor
subprocess.call([editor, temp_file])
# Check if file was modified
mtime_after = os.path.getmtime(temp_file)
if mtime_after == mtime_before:
console.print("[yellow]No changes made[/yellow]")
return
# Read edited data
with open(temp_file, 'r') as f:
try:
new_yaml = f.read()
# Validate YAML
yaml.safe_load(new_yaml)
except yaml.YAMLError as e:
console.print(f"[red]Error parsing YAML:[/red] {e}")
if questionary.confirm("Retry editing?", style=custom_style, default=False).ask():
self._edit_deployment_yaml(dep_name, current_yaml)
return
# Show confirmation
console.print("\n[bold]Deployment YAML has been modified[/bold]")
console.print("[yellow]Warning:[/yellow] Editing deployment directly can be risky.")
# Confirm update
if questionary.confirm(
f"Apply changes to deployment '{dep_name}'?",
style=custom_style,
default=False
).ask():
self.k8s_client.update_deployment_yaml(self.current_namespace, dep_name, new_yaml)
finally:
# Clean up temp file
try:
os.unlink(temp_file)
except Exception:
pass
def _view_configmaps(self):
"""View ConfigMaps."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching ConfigMaps in {self.current_namespace}...[/dim]")
configmaps = self.k8s_client.get_configmaps(self.current_namespace)
if not configmaps:
console.print("[yellow]No ConfigMaps found[/yellow]")
return
self.k8s_client.display_configmaps_table(configmaps)
view_data = questionary.confirm("View ConfigMap data?", style=custom_style, default=False).ask()
if view_data:
cm_names = [cm['name'] for cm in configmaps]
cm_name = questionary.select(
"Select ConfigMap:",
choices=cm_names,
style=custom_style
).ask()
if cm_name:
data = self.k8s_client.get_configmap_data(self.current_namespace, cm_name)
if data:
console.print(f"\n[bold]ConfigMap:[/bold] [cyan]{cm_name}[/cyan]\n")
for key, value in data.items():
console.print(f"[bold cyan]--- {key} ---[/bold cyan]")
console.print(Syntax(value, "yaml", line_numbers=False, background_color="default"))
console.print()
def _edit_configmaps(self):
"""Edit a ConfigMap."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching ConfigMaps in {self.current_namespace}...[/dim]")
configmaps = self.k8s_client.get_configmaps(self.current_namespace)
if not configmaps:
console.print("[yellow]No ConfigMaps found[/yellow]")
return
cm_names = [cm['name'] for cm in configmaps]
cm_name = questionary.select(
"Select ConfigMap to edit:",
choices=cm_names,
style=custom_style
).ask()
if cm_name:
data = self.k8s_client.get_configmap_data(self.current_namespace, cm_name)
if data:
self._edit_configmap(cm_name, data)
def _edit_configmap(self, cm_name: str, current_data: Dict[str, str]):
"""Edit ConfigMap data in text editor."""
# Get editor from environment or use default
editor = os.environ.get('EDITOR', os.environ.get('VISUAL', 'vi'))
# Create temporary file with ConfigMap data
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tf:
temp_file = tf.name
# Write current data as YAML
yaml.dump(current_data, tf, default_flow_style=False, allow_unicode=True)
try:
# Get modification time before editing
mtime_before = os.path.getmtime(temp_file)
# Open editor
subprocess.call([editor, temp_file])
# Check if file was modified
mtime_after = os.path.getmtime(temp_file)
if mtime_after == mtime_before:
console.print("[yellow]No changes made[/yellow]")
return
# Read edited data
with open(temp_file, 'r') as f:
try:
new_data = yaml.safe_load(f)
except yaml.YAMLError as e:
console.print(f"[red]Error parsing YAML:[/red] {e}")
if questionary.confirm("Retry editing?", style=custom_style, default=False).ask():
self._edit_configmap(cm_name, current_data)
return
# Validate that new_data is a dict
if not isinstance(new_data, dict):
console.print("[red]Error:[/red] ConfigMap data must be a dictionary")
if questionary.confirm("Retry editing?", style=custom_style, default=False).ask():
self._edit_configmap(cm_name, current_data)
return
# Convert all values to strings (ConfigMap requirement)
new_data = {k: str(v) if not isinstance(v, str) else v for k, v in new_data.items()}
# Show diff
console.print("\n[bold]Changes:[/bold]")
removed_keys = set(current_data.keys()) - set(new_data.keys())
added_keys = set(new_data.keys()) - set(current_data.keys())
modified_keys = {k for k in current_data.keys() & new_data.keys()
if current_data[k] != new_data[k]}
if removed_keys:
console.print(f"[red]Removed keys:[/red] {', '.join(removed_keys)}")
if added_keys:
console.print(f"[green]Added keys:[/green] {', '.join(added_keys)}")
if modified_keys:
console.print(f"[yellow]Modified keys:[/yellow] {', '.join(modified_keys)}")
if not (removed_keys or added_keys or modified_keys):
console.print("[yellow]No changes detected[/yellow]")
return
# Confirm update
if questionary.confirm(
f"Apply changes to ConfigMap '{cm_name}'?",
style=custom_style,
default=False
).ask():
self.k8s_client.update_configmap(self.current_namespace, cm_name, new_data)
finally:
# Clean up temp file
try:
os.unlink(temp_file)
except Exception:
pass
def _view_pod_logs(self):
"""View pod logs."""
if not self._ensure_namespace_selected():
return
console.print(f"[dim]Fetching pods in {self.current_namespace}...[/dim]")
pods = self.k8s_client.get_pods(self.current_namespace)
if not pods:
console.print("[yellow]No pods found[/yellow]")
return
# Display pods table with all information
self.k8s_client.display_pods_table(pods)
# Simple list of pod names for selection
pod_names = [pod['name'] for pod in pods]
selected = questionary.select(
"Select pod:",
choices=pod_names,
style=custom_style
).ask()
if not selected:
return
pod_name = selected
# Check if pod has multiple containers
containers = self.k8s_client.get_pod_containers(self.current_namespace, pod_name)
container = None
if len(containers) > 1:
container = questionary.select(
"Select container:",
choices=containers,
style=custom_style
).ask()
if not container:
# User pressed Esc
return
elif len(containers) == 1:
container = containers[0]
tail_lines = questionary.text(
"Number of lines to show (default: 100):",
default="100",
validate=lambda text: text.isdigit() and int(text) > 0,
style=custom_style
).ask()
if tail_lines:
logs = self.k8s_client.get_pod_logs(
self.current_namespace,
pod_name,
container=container,
tail_lines=int(tail_lines)
)
if logs:
title = f"[bold cyan]Logs: {pod_name}"
if container:
title += f" [{container}]"
title += "[/bold cyan]"
console.print(f"\n{title}\n")
# Print logs as plain text without syntax highlighting
print(logs)
console.print()
def main():
"""Main entry point."""
try:
app = K8sTool()
app.run()
except KeyboardInterrupt:
console.print("\n[yellow]Goodbye! 👋[/yellow]")
sys.exit(0)
except Exception as e:
console.print(f"[red]Fatal error:[/red] {e}")
sys.exit(1)
if __name__ == "__main__":
main()