k8s-tool/k8s_tool/k8s_client.py
2025-10-26 13:45:12 +03:00

509 lines
19 KiB
Python

"""Kubernetes API client wrapper."""
import os
import yaml
from typing import List, Dict, Any, Optional
from pathlib import Path
from kubernetes import client, config
from kubernetes.client.exceptions import ApiException
from rich.console import Console
from rich.table import Table
from datetime import datetime
console = Console()
class K8sClient:
"""Wrapper for Kubernetes API operations."""
def __init__(self):
"""Initialize Kubernetes client."""
try:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.current_context = self.get_current_context()
console.print(f"[green]✓[/green] Connected to Kubernetes cluster")
if self.current_context:
console.print(f"[dim]Current context: {self.current_context}[/dim]")
except Exception as e:
console.print(f"[red]✗[/red] Failed to connect to Kubernetes: {e}")
raise
def get_contexts(self) -> List[Dict[str, str]]:
"""Get list of available contexts."""
try:
contexts, active_context = config.list_kube_config_contexts()
return [{
'name': ctx['name'],
'cluster': ctx['context'].get('cluster', ''),
'user': ctx['context'].get('user', ''),
'is_active': ctx['name'] == active_context['name']
} for ctx in contexts]
except Exception as e:
console.print(f"[red]Error fetching contexts:[/red] {e}")
return []
def get_current_context(self) -> Optional[str]:
"""Get current context name."""
try:
contexts, active_context = config.list_kube_config_contexts()
return active_context['name'] if active_context else None
except Exception as e:
console.print(f"[red]Error getting current context:[/red] {e}")
return None
def switch_context(self, context_name: str) -> bool:
"""Switch to a different context."""
try:
# Load kubeconfig
kubeconfig = self._load_kubeconfig()
if not kubeconfig:
return False
# Check if context exists
contexts = kubeconfig.get('contexts', [])
context_names = [ctx['name'] for ctx in contexts]
if context_name not in context_names:
console.print(f"[yellow]Context '{context_name}' not found[/yellow]")
return False
# Update current-context in kubeconfig file
kubeconfig['current-context'] = context_name
# Save updated kubeconfig
if not self._save_kubeconfig(kubeconfig):
return False
# Load the new context
config.load_kube_config(context=context_name)
# Reinitialize API clients
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.current_context = context_name
console.print(f"[green]✓[/green] Switched to context: [cyan]{context_name}[/cyan]")
return True
except Exception as e:
console.print(f"[red]Error switching context:[/red] {e}")
return False
def _get_kubeconfig_path(self) -> Path:
"""Get path to kubeconfig file."""
kubeconfig = os.environ.get('KUBECONFIG')
if kubeconfig:
return Path(kubeconfig)
return Path.home() / '.kube' / 'config'
def _load_kubeconfig(self) -> Optional[Dict]:
"""Load kubeconfig file."""
try:
kubeconfig_path = self._get_kubeconfig_path()
if not kubeconfig_path.exists():
console.print(f"[red]Kubeconfig not found:[/red] {kubeconfig_path}")
return None
with open(kubeconfig_path, 'r') as f:
return yaml.safe_load(f)
except Exception as e:
console.print(f"[red]Error loading kubeconfig:[/red] {e}")
return None
def _save_kubeconfig(self, kubeconfig: Dict) -> bool:
"""Save kubeconfig file."""
try:
kubeconfig_path = self._get_kubeconfig_path()
# Create backup
backup_path = kubeconfig_path.with_suffix('.backup')
if kubeconfig_path.exists():
import shutil
shutil.copy(kubeconfig_path, backup_path)
with open(kubeconfig_path, 'w') as f:
yaml.dump(kubeconfig, f, default_flow_style=False)
return True
except Exception as e:
console.print(f"[red]Error saving kubeconfig:[/red] {e}")
return False
def delete_context(self, context_name: str) -> bool:
"""Delete a context from kubeconfig."""
try:
kubeconfig = self._load_kubeconfig()
if not kubeconfig:
return False
# Check if context exists
contexts = kubeconfig.get('contexts', [])
context_names = [ctx['name'] for ctx in contexts]
if context_name not in context_names:
console.print(f"[yellow]Context '{context_name}' not found[/yellow]")
return False
# Don't allow deleting current context
if context_name == self.current_context:
console.print(f"[yellow]Cannot delete current context. Switch to another context first.[/yellow]")
return False
# Remove context
kubeconfig['contexts'] = [ctx for ctx in contexts if ctx['name'] != context_name]
# Save updated config
if self._save_kubeconfig(kubeconfig):
console.print(f"[green]✓[/green] Context '{context_name}' deleted")
return True
return False
except Exception as e:
console.print(f"[red]Error deleting context:[/red] {e}")
return False
def add_context(self, context_name: str, cluster_name: str, user_name: str,
server: str, certificate_authority: Optional[str] = None,
token: Optional[str] = None) -> bool:
"""Add a new context to kubeconfig."""
try:
kubeconfig = self._load_kubeconfig()
if not kubeconfig:
return False
# Check if context already exists
contexts = kubeconfig.get('contexts', [])
if any(ctx['name'] == context_name for ctx in contexts):
console.print(f"[yellow]Context '{context_name}' already exists[/yellow]")
return False
# Add cluster if it doesn't exist
clusters = kubeconfig.get('clusters', [])
if not any(cls['name'] == cluster_name for cls in clusters):
new_cluster = {
'name': cluster_name,
'cluster': {
'server': server
}
}
if certificate_authority:
new_cluster['cluster']['certificate-authority-data'] = certificate_authority
else:
new_cluster['cluster']['insecure-skip-tls-verify'] = True
clusters.append(new_cluster)
kubeconfig['clusters'] = clusters
# Add user if it doesn't exist
users = kubeconfig.get('users', [])
if not any(usr['name'] == user_name for usr in users):
new_user = {
'name': user_name,
'user': {}
}
if token:
new_user['user']['token'] = token
users.append(new_user)
kubeconfig['users'] = users
# Add context
new_context = {
'name': context_name,
'context': {
'cluster': cluster_name,
'user': user_name
}
}
contexts.append(new_context)
kubeconfig['contexts'] = contexts
# Save updated config
if self._save_kubeconfig(kubeconfig):
console.print(f"[green]✓[/green] Context '{context_name}' added")
return True
return False
except Exception as e:
console.print(f"[red]Error adding context:[/red] {e}")
return False
def get_namespaces(self) -> List[str]:
"""Get list of all namespaces."""
try:
namespaces = self.v1.list_namespace()
return sorted([ns.metadata.name for ns in namespaces.items])
except ApiException as e:
console.print(f"[red]Error fetching namespaces:[/red] {e}")
return []
def get_deployments(self, namespace: str) -> List[Dict[str, Any]]:
"""Get list of deployments in namespace."""
try:
deployments = self.apps_v1.list_namespaced_deployment(namespace)
return [{
'name': dep.metadata.name,
'replicas': dep.spec.replicas,
'available': dep.status.available_replicas or 0,
'ready': dep.status.ready_replicas or 0,
} for dep in deployments.items]
except ApiException as e:
console.print(f"[red]Error fetching deployments:[/red] {e}")
return []
def get_pods(self, namespace: str, label_selector: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get list of pods in namespace."""
try:
if label_selector:
pods = self.v1.list_namespaced_pod(namespace, label_selector=label_selector)
else:
pods = self.v1.list_namespaced_pod(namespace)
return [{
'name': pod.metadata.name,
'status': pod.status.phase,
'ready': sum(1 for c in pod.status.container_statuses or [] if c.ready),
'total_containers': len(pod.spec.containers),
'restarts': sum(c.restart_count for c in pod.status.container_statuses or []),
'age': self._calculate_age(pod.metadata.creation_timestamp),
} for pod in pods.items]
except ApiException as e:
console.print(f"[red]Error fetching pods:[/red] {e}")
return []
def get_configmaps(self, namespace: str) -> List[Dict[str, Any]]:
"""Get list of ConfigMaps in namespace."""
try:
configmaps = self.v1.list_namespaced_config_map(namespace)
return [{
'name': cm.metadata.name,
'data_keys': list(cm.data.keys()) if cm.data else [],
'age': self._calculate_age(cm.metadata.creation_timestamp),
} for cm in configmaps.items]
except ApiException as e:
console.print(f"[red]Error fetching ConfigMaps:[/red] {e}")
return []
def get_configmap_data(self, namespace: str, name: str) -> Optional[Dict[str, str]]:
"""Get ConfigMap data."""
try:
cm = self.v1.read_namespaced_config_map(name, namespace)
return cm.data
except ApiException as e:
console.print(f"[red]Error reading ConfigMap:[/red] {e}")
return None
def update_configmap(self, namespace: str, name: str, data: Dict[str, str]) -> bool:
"""Update ConfigMap data."""
try:
# Read the existing ConfigMap first
cm = self.v1.read_namespaced_config_map(name, namespace)
# Update only the data field
cm.data = data
# Patch the ConfigMap
self.v1.patch_namespaced_config_map(name, namespace, cm)
console.print(f"[green]✓[/green] ConfigMap {name} updated successfully")
return True
except ApiException as e:
console.print(f"[red]Error updating ConfigMap:[/red] {e}")
return False
def restart_deployment(self, namespace: str, name: str) -> bool:
"""Restart deployment by updating annotation."""
try:
now = datetime.utcnow().isoformat() + "Z"
body = {
'spec': {
'template': {
'metadata': {
'annotations': {
'kubectl.kubernetes.io/restartedAt': now
}
}
}
}
}
self.apps_v1.patch_namespaced_deployment(name, namespace, body)
console.print(f"[green]✓[/green] Deployment {name} restarted")
return True
except ApiException as e:
console.print(f"[red]Error restarting deployment:[/red] {e}")
return False
def scale_deployment(self, namespace: str, name: str, replicas: int) -> bool:
"""Scale deployment to specified number of replicas."""
try:
body = {'spec': {'replicas': replicas}}
self.apps_v1.patch_namespaced_deployment_scale(name, namespace, body)
console.print(f"[green]✓[/green] Deployment {name} scaled to {replicas} replicas")
return True
except ApiException as e:
console.print(f"[red]Error scaling deployment:[/red] {e}")
return False
def get_deployment_yaml(self, namespace: str, name: str) -> Optional[str]:
"""Get deployment as YAML string."""
try:
from kubernetes import utils
deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
# Convert to dict and remove managed fields and status
deployment_dict = deployment.to_dict()
# Remove fields that shouldn't be edited
if 'metadata' in deployment_dict:
deployment_dict['metadata'].pop('managed_fields', None)
deployment_dict['metadata'].pop('resource_version', None)
deployment_dict['metadata'].pop('uid', None)
deployment_dict['metadata'].pop('self_link', None)
deployment_dict['metadata'].pop('creation_timestamp', None)
deployment_dict['metadata'].pop('generation', None)
# Remove status
deployment_dict.pop('status', None)
# Convert to YAML
import yaml
yaml_str = yaml.dump(deployment_dict, default_flow_style=False, sort_keys=False)
return yaml_str
except ApiException as e:
console.print(f"[red]Error reading deployment:[/red] {e}")
return None
def update_deployment_yaml(self, namespace: str, name: str, yaml_str: str) -> bool:
"""Update deployment from YAML string."""
try:
import yaml
# Parse YAML
deployment_dict = yaml.safe_load(yaml_str)
# Update deployment
self.apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment_dict
)
console.print(f"[green]✓[/green] Deployment {name} updated successfully")
return True
except ApiException as e:
console.print(f"[red]Error updating deployment:[/red] {e}")
return False
except yaml.YAMLError as e:
console.print(f"[red]Error parsing YAML:[/red] {e}")
return False
def get_pod_logs(self, namespace: str, pod_name: str, container: Optional[str] = None,
tail_lines: int = 100) -> Optional[str]:
"""Get logs from pod."""
try:
kwargs = {'name': pod_name, 'namespace': namespace, 'tail_lines': tail_lines}
if container:
kwargs['container'] = container
logs = self.v1.read_namespaced_pod_log(**kwargs)
return logs
except ApiException as e:
console.print(f"[red]Error fetching logs:[/red] {e}")
return None
def get_pod_containers(self, namespace: str, pod_name: str) -> List[str]:
"""Get list of containers in pod."""
try:
pod = self.v1.read_namespaced_pod(pod_name, namespace)
return [container.name for container in pod.spec.containers]
except ApiException as e:
console.print(f"[red]Error fetching pod info:[/red] {e}")
return []
@staticmethod
def _calculate_age(timestamp: datetime) -> str:
"""Calculate age from timestamp."""
if not timestamp:
return "Unknown"
now = datetime.now(timestamp.tzinfo)
delta = now - timestamp
if delta.days > 0:
return f"{delta.days}d"
elif delta.seconds >= 3600:
return f"{delta.seconds // 3600}h"
elif delta.seconds >= 60:
return f"{delta.seconds // 60}m"
else:
return f"{delta.seconds}s"
def display_deployments_table(self, deployments: List[Dict[str, Any]]):
"""Display deployments in a table."""
table = Table(title="Deployments")
table.add_column("Name", style="cyan")
table.add_column("Replicas", style="magenta")
table.add_column("Available", style="green")
table.add_column("Ready", style="yellow")
for dep in deployments:
table.add_row(
dep['name'],
str(dep['replicas']),
str(dep['available']),
str(dep['ready'])
)
console.print(table)
def display_pods_table(self, pods: List[Dict[str, Any]]):
"""Display pods in a table."""
table = Table(title="Pods")
table.add_column("Name", style="cyan")
table.add_column("Status", style="magenta")
table.add_column("Ready", style="green")
table.add_column("Restarts", style="yellow")
table.add_column("Age", style="blue")
for pod in pods:
status_color = "green" if pod['status'] == "Running" else "red"
table.add_row(
pod['name'],
f"[{status_color}]{pod['status']}[/{status_color}]",
f"{pod['ready']}/{pod['total_containers']}",
str(pod['restarts']),
pod['age']
)
console.print(table)
def display_configmaps_table(self, configmaps: List[Dict[str, Any]]):
"""Display ConfigMaps in a table."""
table = Table(title="ConfigMaps")
table.add_column("Name", style="cyan")
table.add_column("Keys", style="magenta")
table.add_column("Age", style="blue")
for cm in configmaps:
table.add_row(
cm['name'],
", ".join(cm['data_keys']) if cm['data_keys'] else "None",
cm['age']
)
console.print(table)
def display_contexts_table(self, contexts: List[Dict[str, str]]):
"""Display contexts in a table."""
table = Table(title="Kubernetes Contexts")
table.add_column("Name", style="cyan")
table.add_column("Cluster", style="magenta")
table.add_column("User", style="yellow")
table.add_column("Active", style="green")
for ctx in contexts:
active_mark = "" if ctx['is_active'] else ""
table.add_row(
ctx['name'],
ctx['cluster'],
ctx['user'],
active_mark
)
console.print(table)