436 lines
16 KiB
Python
436 lines
16 KiB
Python
"""Kubernetes API client wrapper."""
|
|
|
|
import os
|
|
import yaml
|
|
from typing import List, Dict, Any, Optional
|
|
from pathlib import Path
|
|
from kubernetes import client, config
|
|
from kubernetes.client.exceptions import ApiException
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from datetime import datetime
|
|
|
|
console = Console()
|
|
|
|
|
|
class K8sClient:
|
|
"""Wrapper for Kubernetes API operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize Kubernetes client."""
|
|
try:
|
|
config.load_kube_config()
|
|
self.v1 = client.CoreV1Api()
|
|
self.apps_v1 = client.AppsV1Api()
|
|
self.current_context = self.get_current_context()
|
|
console.print(f"[green]✓[/green] Connected to Kubernetes cluster")
|
|
if self.current_context:
|
|
console.print(f"[dim]Current context: {self.current_context}[/dim]")
|
|
except Exception as e:
|
|
console.print(f"[red]✗[/red] Failed to connect to Kubernetes: {e}")
|
|
raise
|
|
|
|
def get_contexts(self) -> List[Dict[str, str]]:
|
|
"""Get list of available contexts."""
|
|
try:
|
|
contexts, active_context = config.list_kube_config_contexts()
|
|
return [{
|
|
'name': ctx['name'],
|
|
'cluster': ctx['context'].get('cluster', ''),
|
|
'user': ctx['context'].get('user', ''),
|
|
'is_active': ctx['name'] == active_context['name']
|
|
} for ctx in contexts]
|
|
except Exception as e:
|
|
console.print(f"[red]Error fetching contexts:[/red] {e}")
|
|
return []
|
|
|
|
def get_current_context(self) -> Optional[str]:
|
|
"""Get current context name."""
|
|
try:
|
|
contexts, active_context = config.list_kube_config_contexts()
|
|
return active_context['name'] if active_context else None
|
|
except Exception as e:
|
|
console.print(f"[red]Error getting current context:[/red] {e}")
|
|
return None
|
|
|
|
def switch_context(self, context_name: str) -> bool:
|
|
"""Switch to a different context."""
|
|
try:
|
|
# Load the new context
|
|
config.load_kube_config(context=context_name)
|
|
# Reinitialize API clients
|
|
self.v1 = client.CoreV1Api()
|
|
self.apps_v1 = client.AppsV1Api()
|
|
self.current_context = context_name
|
|
console.print(f"[green]✓[/green] Switched to context: [cyan]{context_name}[/cyan]")
|
|
return True
|
|
except Exception as e:
|
|
console.print(f"[red]Error switching context:[/red] {e}")
|
|
return False
|
|
|
|
def _get_kubeconfig_path(self) -> Path:
|
|
"""Get path to kubeconfig file."""
|
|
kubeconfig = os.environ.get('KUBECONFIG')
|
|
if kubeconfig:
|
|
return Path(kubeconfig)
|
|
return Path.home() / '.kube' / 'config'
|
|
|
|
def _load_kubeconfig(self) -> Optional[Dict]:
|
|
"""Load kubeconfig file."""
|
|
try:
|
|
kubeconfig_path = self._get_kubeconfig_path()
|
|
if not kubeconfig_path.exists():
|
|
console.print(f"[red]Kubeconfig not found:[/red] {kubeconfig_path}")
|
|
return None
|
|
|
|
with open(kubeconfig_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
except Exception as e:
|
|
console.print(f"[red]Error loading kubeconfig:[/red] {e}")
|
|
return None
|
|
|
|
def _save_kubeconfig(self, kubeconfig: Dict) -> bool:
|
|
"""Save kubeconfig file."""
|
|
try:
|
|
kubeconfig_path = self._get_kubeconfig_path()
|
|
# Create backup
|
|
backup_path = kubeconfig_path.with_suffix('.backup')
|
|
if kubeconfig_path.exists():
|
|
import shutil
|
|
shutil.copy(kubeconfig_path, backup_path)
|
|
|
|
with open(kubeconfig_path, 'w') as f:
|
|
yaml.dump(kubeconfig, f, default_flow_style=False)
|
|
|
|
return True
|
|
except Exception as e:
|
|
console.print(f"[red]Error saving kubeconfig:[/red] {e}")
|
|
return False
|
|
|
|
def delete_context(self, context_name: str) -> bool:
|
|
"""Delete a context from kubeconfig."""
|
|
try:
|
|
kubeconfig = self._load_kubeconfig()
|
|
if not kubeconfig:
|
|
return False
|
|
|
|
# Check if context exists
|
|
contexts = kubeconfig.get('contexts', [])
|
|
context_names = [ctx['name'] for ctx in contexts]
|
|
|
|
if context_name not in context_names:
|
|
console.print(f"[yellow]Context '{context_name}' not found[/yellow]")
|
|
return False
|
|
|
|
# Don't allow deleting current context
|
|
if context_name == self.current_context:
|
|
console.print(f"[yellow]Cannot delete current context. Switch to another context first.[/yellow]")
|
|
return False
|
|
|
|
# Remove context
|
|
kubeconfig['contexts'] = [ctx for ctx in contexts if ctx['name'] != context_name]
|
|
|
|
# Save updated config
|
|
if self._save_kubeconfig(kubeconfig):
|
|
console.print(f"[green]✓[/green] Context '{context_name}' deleted")
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
console.print(f"[red]Error deleting context:[/red] {e}")
|
|
return False
|
|
|
|
def add_context(self, context_name: str, cluster_name: str, user_name: str,
|
|
server: str, certificate_authority: Optional[str] = None,
|
|
token: Optional[str] = None) -> bool:
|
|
"""Add a new context to kubeconfig."""
|
|
try:
|
|
kubeconfig = self._load_kubeconfig()
|
|
if not kubeconfig:
|
|
return False
|
|
|
|
# Check if context already exists
|
|
contexts = kubeconfig.get('contexts', [])
|
|
if any(ctx['name'] == context_name for ctx in contexts):
|
|
console.print(f"[yellow]Context '{context_name}' already exists[/yellow]")
|
|
return False
|
|
|
|
# Add cluster if it doesn't exist
|
|
clusters = kubeconfig.get('clusters', [])
|
|
if not any(cls['name'] == cluster_name for cls in clusters):
|
|
new_cluster = {
|
|
'name': cluster_name,
|
|
'cluster': {
|
|
'server': server
|
|
}
|
|
}
|
|
if certificate_authority:
|
|
new_cluster['cluster']['certificate-authority-data'] = certificate_authority
|
|
else:
|
|
new_cluster['cluster']['insecure-skip-tls-verify'] = True
|
|
|
|
clusters.append(new_cluster)
|
|
kubeconfig['clusters'] = clusters
|
|
|
|
# Add user if it doesn't exist
|
|
users = kubeconfig.get('users', [])
|
|
if not any(usr['name'] == user_name for usr in users):
|
|
new_user = {
|
|
'name': user_name,
|
|
'user': {}
|
|
}
|
|
if token:
|
|
new_user['user']['token'] = token
|
|
|
|
users.append(new_user)
|
|
kubeconfig['users'] = users
|
|
|
|
# Add context
|
|
new_context = {
|
|
'name': context_name,
|
|
'context': {
|
|
'cluster': cluster_name,
|
|
'user': user_name
|
|
}
|
|
}
|
|
contexts.append(new_context)
|
|
kubeconfig['contexts'] = contexts
|
|
|
|
# Save updated config
|
|
if self._save_kubeconfig(kubeconfig):
|
|
console.print(f"[green]✓[/green] Context '{context_name}' added")
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
console.print(f"[red]Error adding context:[/red] {e}")
|
|
return False
|
|
|
|
def get_namespaces(self) -> List[str]:
|
|
"""Get list of all namespaces."""
|
|
try:
|
|
namespaces = self.v1.list_namespace()
|
|
return sorted([ns.metadata.name for ns in namespaces.items])
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching namespaces:[/red] {e}")
|
|
return []
|
|
|
|
def get_deployments(self, namespace: str) -> List[Dict[str, Any]]:
|
|
"""Get list of deployments in namespace."""
|
|
try:
|
|
deployments = self.apps_v1.list_namespaced_deployment(namespace)
|
|
return [{
|
|
'name': dep.metadata.name,
|
|
'replicas': dep.spec.replicas,
|
|
'available': dep.status.available_replicas or 0,
|
|
'ready': dep.status.ready_replicas or 0,
|
|
} for dep in deployments.items]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching deployments:[/red] {e}")
|
|
return []
|
|
|
|
def get_pods(self, namespace: str, label_selector: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Get list of pods in namespace."""
|
|
try:
|
|
if label_selector:
|
|
pods = self.v1.list_namespaced_pod(namespace, label_selector=label_selector)
|
|
else:
|
|
pods = self.v1.list_namespaced_pod(namespace)
|
|
|
|
return [{
|
|
'name': pod.metadata.name,
|
|
'status': pod.status.phase,
|
|
'ready': sum(1 for c in pod.status.container_statuses or [] if c.ready),
|
|
'total_containers': len(pod.spec.containers),
|
|
'restarts': sum(c.restart_count for c in pod.status.container_statuses or []),
|
|
'age': self._calculate_age(pod.metadata.creation_timestamp),
|
|
} for pod in pods.items]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching pods:[/red] {e}")
|
|
return []
|
|
|
|
def get_configmaps(self, namespace: str) -> List[Dict[str, Any]]:
|
|
"""Get list of ConfigMaps in namespace."""
|
|
try:
|
|
configmaps = self.v1.list_namespaced_config_map(namespace)
|
|
return [{
|
|
'name': cm.metadata.name,
|
|
'data_keys': list(cm.data.keys()) if cm.data else [],
|
|
'age': self._calculate_age(cm.metadata.creation_timestamp),
|
|
} for cm in configmaps.items]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching ConfigMaps:[/red] {e}")
|
|
return []
|
|
|
|
def get_configmap_data(self, namespace: str, name: str) -> Optional[Dict[str, str]]:
|
|
"""Get ConfigMap data."""
|
|
try:
|
|
cm = self.v1.read_namespaced_config_map(name, namespace)
|
|
return cm.data
|
|
except ApiException as e:
|
|
console.print(f"[red]Error reading ConfigMap:[/red] {e}")
|
|
return None
|
|
|
|
def update_configmap(self, namespace: str, name: str, data: Dict[str, str]) -> bool:
|
|
"""Update ConfigMap data."""
|
|
try:
|
|
# Read the existing ConfigMap first
|
|
cm = self.v1.read_namespaced_config_map(name, namespace)
|
|
# Update only the data field
|
|
cm.data = data
|
|
# Patch the ConfigMap
|
|
self.v1.patch_namespaced_config_map(name, namespace, cm)
|
|
console.print(f"[green]✓[/green] ConfigMap {name} updated successfully")
|
|
return True
|
|
except ApiException as e:
|
|
console.print(f"[red]Error updating ConfigMap:[/red] {e}")
|
|
return False
|
|
|
|
def restart_deployment(self, namespace: str, name: str) -> bool:
|
|
"""Restart deployment by updating annotation."""
|
|
try:
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
body = {
|
|
'spec': {
|
|
'template': {
|
|
'metadata': {
|
|
'annotations': {
|
|
'kubectl.kubernetes.io/restartedAt': now
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
self.apps_v1.patch_namespaced_deployment(name, namespace, body)
|
|
console.print(f"[green]✓[/green] Deployment {name} restarted")
|
|
return True
|
|
except ApiException as e:
|
|
console.print(f"[red]Error restarting deployment:[/red] {e}")
|
|
return False
|
|
|
|
def scale_deployment(self, namespace: str, name: str, replicas: int) -> bool:
|
|
"""Scale deployment to specified number of replicas."""
|
|
try:
|
|
body = {'spec': {'replicas': replicas}}
|
|
self.apps_v1.patch_namespaced_deployment_scale(name, namespace, body)
|
|
console.print(f"[green]✓[/green] Deployment {name} scaled to {replicas} replicas")
|
|
return True
|
|
except ApiException as e:
|
|
console.print(f"[red]Error scaling deployment:[/red] {e}")
|
|
return False
|
|
|
|
def get_pod_logs(self, namespace: str, pod_name: str, container: Optional[str] = None,
|
|
tail_lines: int = 100) -> Optional[str]:
|
|
"""Get logs from pod."""
|
|
try:
|
|
kwargs = {'name': pod_name, 'namespace': namespace, 'tail_lines': tail_lines}
|
|
if container:
|
|
kwargs['container'] = container
|
|
|
|
logs = self.v1.read_namespaced_pod_log(**kwargs)
|
|
return logs
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching logs:[/red] {e}")
|
|
return None
|
|
|
|
def get_pod_containers(self, namespace: str, pod_name: str) -> List[str]:
|
|
"""Get list of containers in pod."""
|
|
try:
|
|
pod = self.v1.read_namespaced_pod(pod_name, namespace)
|
|
return [container.name for container in pod.spec.containers]
|
|
except ApiException as e:
|
|
console.print(f"[red]Error fetching pod info:[/red] {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def _calculate_age(timestamp: datetime) -> str:
|
|
"""Calculate age from timestamp."""
|
|
if not timestamp:
|
|
return "Unknown"
|
|
|
|
now = datetime.now(timestamp.tzinfo)
|
|
delta = now - timestamp
|
|
|
|
if delta.days > 0:
|
|
return f"{delta.days}d"
|
|
elif delta.seconds >= 3600:
|
|
return f"{delta.seconds // 3600}h"
|
|
elif delta.seconds >= 60:
|
|
return f"{delta.seconds // 60}m"
|
|
else:
|
|
return f"{delta.seconds}s"
|
|
|
|
def display_deployments_table(self, deployments: List[Dict[str, Any]]):
|
|
"""Display deployments in a table."""
|
|
table = Table(title="Deployments")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Replicas", style="magenta")
|
|
table.add_column("Available", style="green")
|
|
table.add_column("Ready", style="yellow")
|
|
|
|
for dep in deployments:
|
|
table.add_row(
|
|
dep['name'],
|
|
str(dep['replicas']),
|
|
str(dep['available']),
|
|
str(dep['ready'])
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
def display_pods_table(self, pods: List[Dict[str, Any]]):
|
|
"""Display pods in a table."""
|
|
table = Table(title="Pods")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Status", style="magenta")
|
|
table.add_column("Ready", style="green")
|
|
table.add_column("Restarts", style="yellow")
|
|
table.add_column("Age", style="blue")
|
|
|
|
for pod in pods:
|
|
status_color = "green" if pod['status'] == "Running" else "red"
|
|
table.add_row(
|
|
pod['name'],
|
|
f"[{status_color}]{pod['status']}[/{status_color}]",
|
|
f"{pod['ready']}/{pod['total_containers']}",
|
|
str(pod['restarts']),
|
|
pod['age']
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
def display_configmaps_table(self, configmaps: List[Dict[str, Any]]):
|
|
"""Display ConfigMaps in a table."""
|
|
table = Table(title="ConfigMaps")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Keys", style="magenta")
|
|
table.add_column("Age", style="blue")
|
|
|
|
for cm in configmaps:
|
|
table.add_row(
|
|
cm['name'],
|
|
", ".join(cm['data_keys']) if cm['data_keys'] else "None",
|
|
cm['age']
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
def display_contexts_table(self, contexts: List[Dict[str, str]]):
|
|
"""Display contexts in a table."""
|
|
table = Table(title="Kubernetes Contexts")
|
|
table.add_column("Name", style="cyan")
|
|
table.add_column("Cluster", style="magenta")
|
|
table.add_column("User", style="yellow")
|
|
table.add_column("Active", style="green")
|
|
|
|
for ctx in contexts:
|
|
active_mark = "✓" if ctx['is_active'] else ""
|
|
table.add_row(
|
|
ctx['name'],
|
|
ctx['cluster'],
|
|
ctx['user'],
|
|
active_mark
|
|
)
|
|
|
|
console.print(table)
|