feat: k8s operator

This commit is contained in:
2025-05-12 12:34:09 +07:00
parent cffe651d6f
commit 952928ffcb
19 changed files with 2486 additions and 0 deletions

View File

@@ -0,0 +1,446 @@
import { PrismaClient } from '@minikura/db';
import type { Server, ReverseProxyServer, CustomEnvironmentVariable } from '@minikura/db';
import { KubernetesClient } from './k8s-client';
import { API_GROUP, API_VERSION, LABEL_PREFIX } from '../config/constants';
import { MINECRAFT_SERVER_CRD } from '../crds/server';
import { REVERSE_PROXY_SERVER_CRD } from '../crds/reverseProxy';
/**
* Sets up the CRD registration and starts a reflector to sync database state to CRDs
*/
export async function setupCRDRegistration(
prisma: PrismaClient,
k8sClient: KubernetesClient,
namespace: string
): Promise<void> {
await registerCRDs(k8sClient);
await startCRDReflector(prisma, k8sClient, namespace);
}
/**
* Registers the CRDs with the Kubernetes API
*/
async function registerCRDs(k8sClient: KubernetesClient): Promise<void> {
try {
const apiExtensionsClient = k8sClient.getApiExtensionsApi();
console.log('Registering CRDs...');
try {
await apiExtensionsClient.createCustomResourceDefinition(MINECRAFT_SERVER_CRD);
console.log(`MinecraftServer CRD created successfully (${API_GROUP}/${API_VERSION})`);
} catch (error: any) {
if (error.response?.statusCode === 409) {
// TODO: Handle conflict
console.log('MinecraftServer CRD already exists');
} else {
console.error('Error creating MinecraftServer CRD:', error);
}
}
try {
await apiExtensionsClient.createCustomResourceDefinition(REVERSE_PROXY_SERVER_CRD);
console.log(`ReverseProxyServer CRD created successfully (${API_GROUP}/${API_VERSION})`);
} catch (error: any) {
if (error.response?.statusCode === 409) {
// TODO: Handle conflict
console.log('ReverseProxyServer CRD already exists');
} else {
console.error('Error creating ReverseProxyServer CRD:', error);
}
}
} catch (error) {
console.error('Error registering CRDs:', error);
throw error;
}
}
/**
* Starts a reflector to sync database state to CRDs
*/
async function startCRDReflector(
prisma: PrismaClient,
k8sClient: KubernetesClient,
namespace: string
): Promise<void> {
const customObjectsApi = k8sClient.getCustomObjectsApi();
// Keep track which server IDs have corresponding CRs
const reflectedMinecraftServers = new Map<string, string>(); // DB ID -> CR name
const reflectedReverseProxyServers = new Map<string, string>(); // DB ID -> CR name
console.log('Starting CRD reflector...');
// Initial sync to create CRs that reflect the DB state
await syncDBtoCRDs(prisma, customObjectsApi, namespace,
reflectedMinecraftServers, reflectedReverseProxyServers);
// Polling interval to check for changes in the DB
// TODO: Make this listener instead
setInterval(async () => {
await syncDBtoCRDs(prisma, customObjectsApi, namespace,
reflectedMinecraftServers, reflectedReverseProxyServers);
}, 30 * 1000);
}
/**
* Synchronizes database state to CRDs
*/
async function syncDBtoCRDs(
prisma: PrismaClient,
customObjectsApi: any,
namespace: string,
reflectedMinecraftServers: Map<string, string>,
reflectedReverseProxyServers: Map<string, string>
): Promise<void> {
try {
console.log(`[${new Date().toISOString()}] Starting CRD sync operation...`);
await syncMinecraftServers(prisma, customObjectsApi, namespace, reflectedMinecraftServers);
await syncReverseProxyServers(prisma, customObjectsApi, namespace, reflectedReverseProxyServers);
console.log(`[${new Date().toISOString()}] CRD sync operation completed`);
} catch (error) {
console.error(`[${new Date().toISOString()}] Error syncing database to CRDs:`, error);
}
}
/**
* Synchronizes Minecraft server objects from the database to CRDs
*/
async function syncMinecraftServers(
prisma: PrismaClient,
customObjectsApi: any,
namespace: string,
reflectedMinecraftServers: Map<string, string>
): Promise<void> {
try {
const servers = await prisma.server.findMany();
let existingCRs: any[] = [];
try {
const response = await customObjectsApi.listNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'minecraftservers'
);
existingCRs = (response.body as any).items || [];
} catch (error) {
console.error('Error listing MinecraftServer CRs:', error);
// TODO: Potentially better error handling here
// For now, continue anyway - it might just be that none exist yet
}
// Map CR names to their corresponding DB IDs
const existingCRMap = new Map<string, string>();
// Map of CR names to their resourceVersions for updates
const crResourceVersions = new Map<string, string>();
for (const cr of existingCRs) {
const internalId = cr.status?.internalId;
if (internalId) {
existingCRMap.set(internalId, cr.metadata.name);
// Store the resourceVersion for later
if (cr.metadata?.resourceVersion) {
crResourceVersions.set(cr.metadata.name, cr.metadata.resourceVersion);
}
}
}
// Refresh tracking map
reflectedMinecraftServers.clear();
// Create or update CRs for each server
for (const server of servers) {
const crName = existingCRMap.get(server.id) || `${server.id.toLowerCase()}`;
// Build the CR object
const serverCR: {
apiVersion: string,
kind: string,
metadata: {
name: string,
namespace: string,
annotations: Record<string, string>,
resourceVersion?: string
},
spec: any,
status: any
} = {
apiVersion: `${API_GROUP}/${API_VERSION}`,
kind: 'MinecraftServer',
metadata: {
name: crName,
namespace: namespace,
annotations: {
[`${LABEL_PREFIX}/database-managed`]: 'true',
[`${LABEL_PREFIX}/last-synced`]: new Date().toISOString()
}
},
spec: {
id: server.id,
description: server.description,
listen_port: server.listen_port,
type: server.type,
memory: server.memory
},
status: {
phase: 'Running',
message: 'Managed by database',
internalId: server.id,
apiKey: '[REDACTED]', // Don't expose actual API key
lastSyncedAt: new Date().toISOString()
}
};
try {
if (existingCRMap.has(server.id)) {
// Update existing CR
// Get the current resource first
const crName = existingCRMap.get(server.id)!;
try {
// Get the existing resource to get the current resourceVersion
const existingResource = await customObjectsApi.getNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'minecraftservers',
crName
);
// Extract the resourceVersion from the existing resource
if (existingResource?.body?.metadata?.resourceVersion) {
// Add the resourceVersion to our custom resource
serverCR.metadata.resourceVersion = existingResource.body.metadata.resourceVersion;
}
// Now update with the correct resourceVersion
await customObjectsApi.replaceNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'minecraftservers',
crName,
serverCR
);
console.log(`Updated MinecraftServer CR ${crName} for server ${server.id}`);
} catch (error) {
console.error(`Error getting/updating MinecraftServer CR for ${server.id}:`, error);
}
} else {
// Create new CR
await customObjectsApi.createNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'minecraftservers',
serverCR
);
console.log(`Created MinecraftServer CR ${crName} for server ${server.id}`);
}
// Remember this mapping
reflectedMinecraftServers.set(server.id, crName);
} catch (error) {
console.error(`Error creating/updating MinecraftServer CR for ${server.id}:`, error);
}
}
// Delete CRs for servers that no longer exist
for (const [dbId, crName] of existingCRMap.entries()) {
if (!servers.some(s => s.id === dbId)) {
try {
await customObjectsApi.deleteNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'minecraftservers',
crName
);
console.log(`Deleted MinecraftServer CR ${crName} for removed server ID ${dbId}`);
} catch (error) {
console.error(`Error deleting MinecraftServer CR ${crName}:`, error);
}
}
}
} catch (error) {
console.error('Error syncing Minecraft servers to CRDs:', error);
}
}
/**
* Synchronizes Reverse Proxy server objects from the database to CRDs
*/
async function syncReverseProxyServers(
prisma: PrismaClient,
customObjectsApi: any,
namespace: string,
reflectedReverseProxyServers: Map<string, string>
): Promise<void> {
try {
const proxies = await prisma.reverseProxyServer.findMany({
include: {
env_variables: true
}
});
let existingCRs: any[] = [];
try {
const response = await customObjectsApi.listNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'reverseproxyservers'
);
existingCRs = (response.body as any).items || [];
} catch (error) {
console.error('Error listing ReverseProxyServer CRs:', error);
// TODO: Potentially better error handling here
// For now, continue anyway - it might just be that none exist yet
}
// Map CR names to their corresponding DB IDs
const existingCRMap = new Map<string, string>();
// Map of CR names to their resourceVersions for updates
const crResourceVersions = new Map<string, string>();
for (const cr of existingCRs) {
const internalId = cr.status?.internalId;
if (internalId) {
existingCRMap.set(internalId, cr.metadata.name);
// Store the resourceVersion for later
if (cr.metadata?.resourceVersion) {
crResourceVersions.set(cr.metadata.name, cr.metadata.resourceVersion);
}
}
}
// Refresh tracking map
reflectedReverseProxyServers.clear();
// Create or update CRs for each proxy
for (const proxy of proxies) {
const crName = existingCRMap.get(proxy.id) || `${proxy.id.toLowerCase()}`;
// Build the CR object
const proxyCR: {
apiVersion: string,
kind: string,
metadata: {
name: string,
namespace: string,
annotations: Record<string, string>,
resourceVersion?: string
},
spec: any,
status: any
} = {
apiVersion: `${API_GROUP}/${API_VERSION}`,
kind: 'ReverseProxyServer',
metadata: {
name: crName,
namespace: namespace,
annotations: {
[`${LABEL_PREFIX}/database-managed`]: 'true',
[`${LABEL_PREFIX}/last-synced`]: new Date().toISOString()
}
},
spec: {
id: proxy.id,
description: proxy.description,
external_address: proxy.external_address,
external_port: proxy.external_port,
listen_port: proxy.listen_port,
type: proxy.type,
memory: proxy.memory,
environmentVariables: proxy.env_variables?.map(ev => ({
key: ev.key,
value: ev.value
}))
},
status: {
phase: 'Running',
message: 'Managed by database',
internalId: proxy.id,
apiKey: '[REDACTED]', // Don't expose actual API key
lastSyncedAt: new Date().toISOString()
}
};
try {
if (existingCRMap.has(proxy.id)) {
// Update existing CR
// Get the current resource first
const crName = existingCRMap.get(proxy.id)!;
try {
// Get the existing resource to get the current resourceVersion
const existingResource = await customObjectsApi.getNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'reverseproxyservers',
crName
);
// Extract the resourceVersion from the existing resource
if (existingResource?.body?.metadata?.resourceVersion) {
// Add the resourceVersion to our custom resource
proxyCR.metadata.resourceVersion = existingResource.body.metadata.resourceVersion;
}
// Now update with the correct resourceVersion
await customObjectsApi.replaceNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'reverseproxyservers',
crName,
proxyCR
);
console.log(`Updated ReverseProxyServer CR ${crName} for proxy ${proxy.id}`);
} catch (error) {
console.error(`Error getting/updating ReverseProxyServer CR for ${proxy.id}:`, error);
}
} else {
// Create new CR
await customObjectsApi.createNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'reverseproxyservers',
proxyCR
);
console.log(`Created ReverseProxyServer CR ${crName} for proxy ${proxy.id}`);
}
// Remember this mapping
reflectedReverseProxyServers.set(proxy.id, crName);
} catch (error) {
console.error(`Error creating/updating ReverseProxyServer CR for ${proxy.id}:`, error);
}
}
// Delete CRs for proxies that no longer exist
for (const [dbId, crName] of existingCRMap.entries()) {
if (!proxies.some(p => p.id === dbId)) {
try {
await customObjectsApi.deleteNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
'reverseproxyservers',
crName
);
console.log(`Deleted ReverseProxyServer CR ${crName} for removed proxy ID ${dbId}`);
} catch (error) {
console.error(`Error deleting ReverseProxyServer CR ${crName}:`, error);
}
}
}
} catch (error) {
console.error('Error syncing Reverse Proxy servers to CRDs:', error);
}
}

View File

@@ -0,0 +1,101 @@
import * as k8s from '@kubernetes/client-node';
import { SKIP_TLS_VERIFY, NAMESPACE } from '../config/constants';
export class KubernetesClient {
private static instance: KubernetesClient;
private kc: k8s.KubeConfig;
private appsApi!: k8s.AppsV1Api;
private coreApi!: k8s.CoreV1Api;
private networkingApi!: k8s.NetworkingV1Api;
private customObjectsApi!: k8s.CustomObjectsApi;
private apiExtensionsApi!: k8s.ApiextensionsV1Api;
private constructor() {
if (SKIP_TLS_VERIFY) {
console.log('Disabling TLS certificate validation');
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
}
this.kc = new k8s.KubeConfig();
this.setupConfig();
this.initializeClients();
}
static getInstance(): KubernetesClient {
if (!KubernetesClient.instance) {
KubernetesClient.instance = new KubernetesClient();
}
return KubernetesClient.instance;
}
private setupConfig(): void {
try {
this.kc.loadFromDefault();
console.log('Loaded Kubernetes config from default location');
} catch (err) {
console.warn('Failed to load Kubernetes config from default location:', err);
}
// Running in a cluster, try to load in-cluster config
if (!this.kc.getCurrentContext()) {
try {
this.kc.loadFromCluster();
console.log('Loaded Kubernetes config from cluster');
} catch (err) {
console.warn('Failed to load Kubernetes config from cluster:', err);
}
}
if (!this.kc.getCurrentContext()) {
throw new Error('Failed to setup Kubernetes client - no valid configuration found');
}
const currentCluster = this.kc.getCurrentCluster();
if (currentCluster) {
console.log(`Connecting to Kubernetes server: ${currentCluster.server}`);
}
}
private initializeClients(): void {
this.appsApi = this.kc.makeApiClient(k8s.AppsV1Api);
this.coreApi = this.kc.makeApiClient(k8s.CoreV1Api);
this.networkingApi = this.kc.makeApiClient(k8s.NetworkingV1Api);
this.customObjectsApi = this.kc.makeApiClient(k8s.CustomObjectsApi);
this.apiExtensionsApi = this.kc.makeApiClient(k8s.ApiextensionsV1Api);
}
getKubeConfig(): k8s.KubeConfig {
return this.kc;
}
getAppsApi(): k8s.AppsV1Api {
return this.appsApi;
}
getCoreApi(): k8s.CoreV1Api {
return this.coreApi;
}
getNetworkingApi(): k8s.NetworkingV1Api {
return this.networkingApi;
}
getCustomObjectsApi(): k8s.CustomObjectsApi {
return this.customObjectsApi;
}
getApiExtensionsApi(): k8s.ApiextensionsV1Api {
return this.apiExtensionsApi;
}
async handleApiError(error: any, context: string): Promise<never> {
console.error(`Kubernetes API error (${context}):`, error?.message || error);
if (error?.response) {
console.error(`Response status: ${error.response.statusCode}`);
console.error(`Response body: ${JSON.stringify(error.response.body)}`);
}
throw error;
}
}

View File

@@ -0,0 +1,38 @@
/**
* Memory utility functions for Kubernetes resources
*/
/**
* Calculate memory for Java (lower than what's requested to account for JVM overhead)
* @param memoryString Memory string in format like "512M" or "1G"
* @param factor Multiplicative factor to apply (e.g., 0.8 for 80%)
* @returns Calculated memory string in same format
*/
export function calculateJavaMemory(memoryString: string, factor: number): string {
const match = memoryString.match(/^(\d+)([MG])$/i);
if (!match) return "512M"; // Default if format is not recognized
const [, valueStr, unit] = match;
const value = parseInt(valueStr, 10);
const calculatedValue = Math.round(value * factor);
return `${calculatedValue}${unit.toUpperCase()}`;
}
/**
* Convert memory string to Kubernetes format (e.g., "1G" -> "1Gi")
* @param memoryString Memory string in format like "512M" or "1G"
* @returns Memory string in Kubernetes format
*/
export function convertToK8sFormat(memoryString: string): string {
const match = memoryString.match(/^(\d+)([MG])$/i);
if (!match) return "1Gi"; // Default if format is not recognized
const [, valueStr, unit] = match;
if (unit.toUpperCase() === 'G') {
return `${valueStr}Gi`;
} else {
return `${valueStr}Mi`;
}
}

View File

@@ -0,0 +1,212 @@
import { KubernetesClient } from './k8s-client';
import {
minikuraNamespace,
minikuraServiceAccount,
minikuraClusterRole,
minikuraClusterRoleBinding,
minikuraOperatorDeployment
} from '../crds/rbac';
import fetch from 'node-fetch';
/**
* Registers all RBAC resources required
* @param k8sClient The Kubernetes client instance
*/
export async function registerRBACResources(k8sClient: KubernetesClient): Promise<void> {
try {
console.log('Starting RBAC resources registration...');
await registerNamespace(k8sClient);
await registerServiceAccount(k8sClient);
await registerClusterRole(k8sClient);
await registerClusterRoleBinding(k8sClient);
console.log('RBAC resources registration completed successfully');
} catch (error: any) {
console.error('Error registering RBAC resources:', error.message);
if (error.response) {
console.error(`Response status: ${error.response.statusCode}`);
console.error(`Response body: ${JSON.stringify(error.response.body)}`);
}
throw error;
}
}
/**
* Registers the namespace
*/
async function registerNamespace(k8sClient: KubernetesClient): Promise<void> {
try {
const coreApi = k8sClient.getCoreApi();
await coreApi.createNamespace(minikuraNamespace);
console.log(`Created namespace ${minikuraNamespace.metadata.name}`);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log(`Namespace ${minikuraNamespace.metadata.name} already exists`);
} else {
throw error;
}
}
}
/**
* Registers the service account
*/
async function registerServiceAccount(k8sClient: KubernetesClient): Promise<void> {
try {
const coreApi = k8sClient.getCoreApi();
await coreApi.createNamespacedServiceAccount(
minikuraServiceAccount.metadata.namespace,
minikuraServiceAccount
);
console.log(`Created service account ${minikuraServiceAccount.metadata.name}`);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log(`Service account ${minikuraServiceAccount.metadata.name} already exists`);
} else {
throw error;
}
}
}
/**
* Registers the cluster role
*/
async function registerClusterRole(k8sClient: KubernetesClient): Promise<void> {
try {
// TODO: I can't get this working with the k8s client, so I'm using fetch directly, fix later
const kc = k8sClient.getKubeConfig();
const opts = {};
kc.applyToRequest(opts as any);
// Get cluster URL
const cluster = kc.getCurrentCluster();
if (!cluster) {
throw new Error('No active cluster found in KubeConfig');
}
try {
const response = await fetch(`${cluster.server}/apis/rbac.authorization.k8s.io/v1/clusterroles`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(opts as any).headers
},
body: JSON.stringify(minikuraClusterRole),
agent: (opts as any).agent
});
if (response.ok) {
console.log(`Created cluster role ${minikuraClusterRole.metadata.name}`);
} else if (response.status === 409) {
console.log(`Cluster role ${minikuraClusterRole.metadata.name} already exists`);
} else {
const text = await response.text();
throw new Error(`Failed to create cluster role: ${response.status} ${response.statusText} - ${text}`);
}
} catch (error: any) {
// If the error message contains "already exists", that's OK
if (error.message?.includes('already exists') || error.message?.includes('409')) {
console.log(`Cluster role ${minikuraClusterRole.metadata.name} already exists`);
} else {
throw error;
}
}
} catch (error: any) {
console.error(`Error registering cluster role:`, error.message);
throw error;
}
}
/**
* Registers the Minikura cluster role binding
*/
async function registerClusterRoleBinding(k8sClient: KubernetesClient): Promise<void> {
try {
// We need to use the raw client for cluster roles
const kc = k8sClient.getKubeConfig();
const opts = {};
kc.applyToRequest(opts as any);
// Get cluster URL
const cluster = kc.getCurrentCluster();
if (!cluster) {
throw new Error('No active cluster found in KubeConfig');
}
// Create the cluster role binding
const { default: fetch } = await import('node-fetch');
try {
const response = await fetch(`${cluster.server}/apis/rbac.authorization.k8s.io/v1/clusterrolebindings`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(opts as any).headers
},
body: JSON.stringify(minikuraClusterRoleBinding),
agent: (opts as any).agent
});
if (response.ok) {
console.log(`Created cluster role binding ${minikuraClusterRoleBinding.metadata.name}`);
} else if (response.status === 409) {
console.log(`Cluster role binding ${minikuraClusterRoleBinding.metadata.name} already exists`);
} else {
const text = await response.text();
throw new Error(`Failed to create cluster role binding: ${response.status} ${response.statusText} - ${text}`);
}
} catch (error: any) {
// If the error message contains "already exists"
// TODO: Potentially better error handling here
if (error.message?.includes('already exists') || error.message?.includes('409')) {
console.log(`Cluster role binding ${minikuraClusterRoleBinding.metadata.name} already exists`);
} else {
throw error;
}
}
} catch (error: any) {
console.error(`Error registering cluster role binding:`, error.message);
throw error;
}
}
/**
* Registers the Minikura operator deployment
* Note: This requires the secret to be created first
*/
export async function registerOperatorDeployment(
k8sClient: KubernetesClient,
registryUrl: string
): Promise<void> {
try {
// Replace the registry URL placeholder, for future use
const deployment = JSON.parse(
JSON.stringify(minikuraOperatorDeployment).replace('${REGISTRY_URL}', registryUrl)
);
const appsApi = k8sClient.getAppsApi();
await appsApi.createNamespacedDeployment(
deployment.metadata.namespace,
deployment
);
console.log(`Created deployment ${deployment.metadata.name}`);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log(`Deployment ${minikuraOperatorDeployment.metadata.name} already exists`);
// Update the deployment if it already exists
const deployment = JSON.parse(
JSON.stringify(minikuraOperatorDeployment).replace('${REGISTRY_URL}', registryUrl)
);
await k8sClient.getAppsApi().replaceNamespacedDeployment(
deployment.metadata.name,
deployment.metadata.namespace,
deployment
);
console.log(`Updated deployment ${deployment.metadata.name}`);
} else {
throw error;
}
}
}