feat: topology, and improves handling

This commit is contained in:
2026-02-17 18:12:02 +07:00
parent e8dbefde43
commit d14f043e7c
145 changed files with 4213 additions and 2861 deletions

View File

@@ -6,10 +6,11 @@
"type": "module",
"scripts": {
"build": "tsc",
"start": "bun dist/index.js",
"dev": "bun --watch src/index.ts",
"watch": "bun --watch src/index.ts",
"apply-crds": "bun src/scripts/apply-crds.ts",
"start": "node dist/index.js",
"dev": "tsx watch src/index.ts",
"dev:bun": "bun --watch src/index.ts",
"watch": "tsx watch src/index.ts",
"apply-crds": "tsx src/scripts/apply-crds.ts",
"typecheck": "tsc --noEmit"
},
"dependencies": {
@@ -19,6 +20,9 @@
"dotenv-mono": "^1.5.1",
"node-fetch": "^3.3.2",
"pg": "^8.11.3",
"pino": "^10.3.1",
"pino-pretty": "^13.1.3",
"undici": "^7.18.2",
"yaml": "^2.6.1"
},
"devDependencies": {

View File

@@ -1,18 +1,14 @@
import { dotenvLoad } from "dotenv-mono";
const dotenv = dotenvLoad();
export const API_GROUP = "minikura.kirameki.cafe";
const _dotenv = dotenvLoad();
export { API_GROUP, LABEL_PREFIX } from "@minikura/api";
export const API_VERSION = "v1alpha1";
export const KUBERNETES_NAMESPACE_ENV = process.env.KUBERNETES_NAMESPACE;
export const NAMESPACE = process.env.KUBERNETES_NAMESPACE || "minikura";
export const ENABLE_CRD_REFLECTION = process.env.ENABLE_CRD_REFLECTION === "true";
export const SKIP_TLS_VERIFY = process.env.KUBERNETES_SKIP_TLS_VERIFY === "true";
if (SKIP_TLS_VERIFY) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
}
// Resource types
export const RESOURCE_TYPES = {
@@ -30,9 +26,6 @@ export const RESOURCE_TYPES = {
},
};
// Kubernetes resource label prefixes
export const LABEL_PREFIX = "minikura.kirameki.cafe";
// Polling intervals (in milliseconds)
export const SYNC_INTERVAL = 30 * 1000; // 30 seconds

View File

@@ -0,0 +1,16 @@
export const RESOURCE_DEFAULTS = {
server: {
memory: "1G",
javaMemoryFactor: 0.8,
},
proxy: {
memory: "512M",
javaMemoryFactor: 0.8,
},
} as const;
// 80% of container memory goes to JVM heap; 20% headroom
export const JAVA_MEMORY_FACTOR = 0.8;
export const DEFAULT_SERVER_MEMORY = "1G";
export const DEFAULT_PROXY_MEMORY = "512M";

View File

@@ -1,57 +1,53 @@
import type { PrismaClient } from "@minikura/db";
import { KubernetesClient } from "../utils/k8s-client";
import type { Logger } from "pino";
import { SYNC_INTERVAL } from "../config/constants";
import { KubernetesClient } from "../utils/k8s-client";
import { createLogger } from "../utils/logger";
export abstract class BaseController {
protected prisma: PrismaClient;
protected k8sClient: KubernetesClient;
protected namespace: string;
protected logger: Logger;
private intervalId: ReturnType<typeof setInterval> | null = null;
constructor(prisma: PrismaClient, namespace: string) {
this.prisma = prisma;
this.k8sClient = KubernetesClient.getInstance();
this.namespace = namespace;
this.logger = createLogger({ controller: this.getControllerName() });
}
/**
* Start watching for changes in the database and syncing to Kubernetes
*/
public startWatching(): void {
console.log(`Starting to watch for changes in ${this.getControllerName()}...`);
this.logger.info(
{ namespace: this.namespace, syncInterval: SYNC_INTERVAL },
"Starting controller watch loop"
);
// Initial sync
this.syncResources().catch((err) => {
console.error(`Error during initial sync of ${this.getControllerName()}:`, err);
this.logger.error({ err }, "Error during initial resource synchronization");
});
// Polling interval for changes
// TODO: Maybe there's a better way to do this
this.intervalId = setInterval(() => {
this.syncResources().catch((err) => {
console.error(`Error syncing ${this.getControllerName()}:`, err);
this.logger.error({ err }, "Error during periodic resource synchronization");
});
}, SYNC_INTERVAL);
this.logger.debug(
{ intervalMs: SYNC_INTERVAL },
"Polling interval established for resource synchronization"
);
}
/**
* Stop watching for changes
*/
public stopWatching(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
console.log(`Stopped watching for changes in ${this.getControllerName()}`);
this.logger.info("Controller watch loop stopped");
}
}
/**
* Get a name for this controller for logging purposes
*/
protected abstract getControllerName(): string;
/**
* Sync resources from database to Kubernetes
*/
protected abstract syncResources(): Promise<void>;
}

View File

@@ -1,11 +1,10 @@
import type { PrismaClient } from "@minikura/db";
import type { ReverseProxyServer, CustomEnvironmentVariable } from "@minikura/db";
import { BaseController } from "./base-controller";
import type { ReverseProxyConfig } from "../types";
import type { CustomEnvironmentVariable, ReverseProxyServer } from "@minikura/db";
import {
createReverseProxyServer,
deleteReverseProxyServer,
} from "../resources/reverseProxyServer";
import type { ReverseProxyConfig } from "../types";
import { BaseController } from "./base-controller";
type ReverseProxyWithEnvVars = ReverseProxyServer & {
env_variables: CustomEnvironmentVariable[];
@@ -14,10 +13,6 @@ type ReverseProxyWithEnvVars = ReverseProxyServer & {
export class ReverseProxyController extends BaseController {
private deployedProxies = new Map<string, ReverseProxyWithEnvVars>();
constructor(prisma: PrismaClient, namespace: string) {
super(prisma, namespace);
}
protected getControllerName(): string {
return "ReverseProxyController";
}
@@ -36,25 +31,32 @@ export class ReverseProxyController extends BaseController {
const currentProxyIds = new Set(proxies.map((proxy) => proxy.id));
// Delete reverse proxy servers that are no longer in the database
for (const [proxyId, proxy] of this.deployedProxies.entries()) {
if (!currentProxyIds.has(proxyId)) {
console.log(
`Reverse proxy server ${proxy.id} (${proxyId}) has been removed from the database, deleting from Kubernetes...`
this.logger.info(
{ proxyId, proxyType: proxy.type },
"Reverse proxy removed from database, deleting K8s resources"
);
await deleteReverseProxyServer(proxy.id, proxy.type, appsApi, coreApi, this.namespace);
this.deployedProxies.delete(proxyId);
}
}
// Create or update reverse proxy servers that are in the database
for (const proxy of proxies) {
const deployedProxy = this.deployedProxies.get(proxy.id);
// If proxy doesn't exist yet or has been updated
if (!deployedProxy || this.hasProxyChanged(deployedProxy, proxy)) {
console.log(
`${!deployedProxy ? "Creating" : "Updating"} reverse proxy server ${proxy.id} (${proxy.id}) in Kubernetes...`
const action = !deployedProxy ? "Creating" : "Updating";
this.logger.info(
{
proxyId: proxy.id,
proxyType: proxy.type,
action: action.toLowerCase(),
externalAddress: proxy.external_address,
externalPort: proxy.external_port,
listenPort: proxy.listen_port,
},
`${action} reverse proxy server in Kubernetes`
);
const proxyConfig: ReverseProxyConfig = {
@@ -66,6 +68,7 @@ export class ReverseProxyController extends BaseController {
apiKey: proxy.api_key,
type: proxy.type,
memory: proxy.memory,
service_type: proxy.service_type,
env_variables: proxy.env_variables?.map((ev) => ({
key: ev.key,
value: ev.value,
@@ -80,12 +83,11 @@ export class ReverseProxyController extends BaseController {
this.namespace
);
// Update cache
this.deployedProxies.set(proxy.id, { ...proxy });
}
}
} catch (error) {
console.error("Error syncing reverse proxy servers:", error);
this.logger.error({ err: error }, "Failed to sync reverse proxy servers to Kubernetes");
throw error;
}
}
@@ -94,20 +96,20 @@ export class ReverseProxyController extends BaseController {
oldProxy: ReverseProxyWithEnvVars,
newProxy: ReverseProxyWithEnvVars
): boolean {
// Check basic properties
const basicPropsChanged =
oldProxy.external_address !== newProxy.external_address ||
oldProxy.external_port !== newProxy.external_port ||
oldProxy.listen_port !== newProxy.listen_port ||
oldProxy.description !== newProxy.description;
oldProxy.description !== newProxy.description ||
oldProxy.service_type !== newProxy.service_type;
if (basicPropsChanged) return true;
// Check if environment variables have changed
const oldEnvVars = oldProxy.env_variables || [];
const newEnvVars = newProxy.env_variables || [];
if (oldEnvVars.length !== newEnvVars.length) return true;
for (const newEnv of newEnvVars) {
const oldEnv = oldEnvVars.find((e) => e.key === newEnv.key);
if (!oldEnv || oldEnv.value !== newEnv.value) {

View File

@@ -1,8 +1,7 @@
import { type PrismaClient, ServerType } from "@minikura/db";
import type { Server, CustomEnvironmentVariable } from "@minikura/db";
import { BaseController } from "./base-controller";
import type { ServerConfig } from "../types";
import type { CustomEnvironmentVariable, Server } from "@minikura/db";
import { createServer, deleteServer } from "../resources/server";
import type { ServerConfig } from "../types";
import { BaseController } from "./base-controller";
type ServerWithEnvVars = Server & {
env_variables: CustomEnvironmentVariable[];
@@ -11,10 +10,6 @@ type ServerWithEnvVars = Server & {
export class ServerController extends BaseController {
private deployedServers = new Map<string, ServerWithEnvVars>();
constructor(prisma: PrismaClient, namespace: string) {
super(prisma, namespace);
}
protected getControllerName(): string {
return "ServerController";
}
@@ -33,25 +28,31 @@ export class ServerController extends BaseController {
const currentServerIds = new Set(servers.map((server) => server.id));
// Delete servers that are no longer in the database
for (const [serverId, server] of this.deployedServers.entries()) {
if (!currentServerIds.has(serverId)) {
console.log(
`Server ${server.id} (${serverId}) has been removed from the database, deleting from Kubernetes...`
this.logger.info(
{ serverId, serverName: server.id },
"Server removed from database, deleting K8s resources"
);
await deleteServer(serverId, appsApi, coreApi, this.namespace);
this.deployedServers.delete(serverId);
}
}
// Create or update servers that are in the database
for (const server of servers) {
const deployedServer = this.deployedServers.get(server.id);
// If server doesn't exist yet or has been updated
if (!deployedServer || this.hasServerChanged(deployedServer, server)) {
console.log(
`${!deployedServer ? "Creating" : "Updating"} server ${server.id} (${server.id}) in Kubernetes...`
const action = !deployedServer ? "Creating" : "Updating";
this.logger.info(
{
serverId: server.id,
serverType: server.type,
action: action.toLowerCase(),
memory: server.memory,
port: server.listen_port,
},
`${action} Minecraft server in Kubernetes`
);
const serverConfig: ServerConfig = {
@@ -61,6 +62,7 @@ export class ServerController extends BaseController {
description: server.description,
listen_port: server.listen_port,
memory: server.memory,
service_type: server.service_type,
env_variables: server.env_variables?.map((ev) => ({
key: ev.key,
value: ev.value,
@@ -69,33 +71,29 @@ export class ServerController extends BaseController {
await createServer(serverConfig, appsApi, coreApi, networkingApi, this.namespace);
// Update cache
this.deployedServers.set(server.id, { ...server });
}
}
} catch (error) {
console.error("Error syncing servers:", error);
this.logger.error({ err: error }, "Failed to sync servers to Kubernetes");
throw error;
}
}
private hasServerChanged(oldServer: ServerWithEnvVars, newServer: ServerWithEnvVars): boolean {
// Check basic properties
const basicPropsChanged =
oldServer.type !== newServer.type ||
oldServer.listen_port !== newServer.listen_port ||
oldServer.description !== newServer.description;
oldServer.description !== newServer.description ||
oldServer.service_type !== newServer.service_type;
if (basicPropsChanged) return true;
// Check if environment variables have changed
const oldEnvVars = oldServer.env_variables || [];
const newEnvVars = newServer.env_variables || [];
// Check if the number of env vars has changed
if (oldEnvVars.length !== newEnvVars.length) return true;
// Check if any of the existing env vars have changed
for (const newEnv of newEnvVars) {
const oldEnv = oldEnvVars.find((e) => e.key === newEnv.key);
if (!oldEnv || oldEnv.value !== newEnv.value) {

View File

@@ -1,8 +1,5 @@
import { NAMESPACE } from "../config/constants";
import { API_GROUP, NAMESPACE } from "../config/constants";
/**
* Namespace definition
*/
export const minikuraNamespace = {
apiVersion: "v1",
kind: "Namespace",
@@ -11,9 +8,6 @@ export const minikuraNamespace = {
},
};
/**
* Service account
*/
export const minikuraServiceAccount = {
apiVersion: "v1",
kind: "ServiceAccount",
@@ -23,9 +17,6 @@ export const minikuraServiceAccount = {
},
};
/**
* Cluster role
*/
export const minikuraClusterRole = {
apiVersion: "rbac.authorization.k8s.io/v1",
kind: "ClusterRole",
@@ -54,21 +45,18 @@ export const minikuraClusterRole = {
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"],
},
{
apiGroups: ["minikura.kirameki.cafe"],
apiGroups: [API_GROUP],
resources: ["minecraftservers", "velocityproxies"],
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"],
},
{
apiGroups: ["minikura.kirameki.cafe"],
apiGroups: [API_GROUP],
resources: ["minecraftservers/status", "velocityproxies/status"],
verbs: ["get", "update", "patch"],
},
],
};
/**
* Cluster role binding
*/
export const minikuraClusterRoleBinding = {
apiVersion: "rbac.authorization.k8s.io/v1",
kind: "ClusterRoleBinding",
@@ -89,9 +77,6 @@ export const minikuraClusterRoleBinding = {
},
};
/**
* Deployment for the Minikura operator
*/
export const minikuraOperatorDeployment = {
apiVersion: "apps/v1",
kind: "Deployment",

View File

@@ -1,20 +1,24 @@
import { dotenvLoad } from "dotenv-mono";
const dotenv = dotenvLoad();
import { NAMESPACE, KUBERNETES_NAMESPACE_ENV, ENABLE_CRD_REFLECTION } from "./config/constants";
const _dotenv = dotenvLoad();
import { prisma } from "@minikura/db";
import { KubernetesClient } from "./utils/k8s-client";
import { ServerController } from "./controllers/server-controller";
import { ENABLE_CRD_REFLECTION, NAMESPACE } from "./config/constants";
import { ReverseProxyController } from "./controllers/reverse-proxy-controller";
import { ServerController } from "./controllers/server-controller";
import { setupCRDRegistration } from "./utils/crd-registrar";
import { KubernetesClient } from "./utils/k8s-client";
import { logger } from "./utils/logger";
async function main() {
console.log("Starting Minikura Kubernetes Operator...");
console.log(`Using namespace: ${NAMESPACE}`);
logger.info(
{ namespace: NAMESPACE, crdReflection: ENABLE_CRD_REFLECTION },
"Starting Minikura Kubernetes Operator"
);
try {
const k8sClient = KubernetesClient.getInstance();
console.log("Connected to Kubernetes cluster");
logger.info({ namespace: NAMESPACE }, "Successfully connected to Kubernetes cluster");
const serverController = new ServerController(prisma, NAMESPACE);
const reverseProxyController = new ReverseProxyController(prisma, NAMESPACE);
@@ -23,49 +27,55 @@ async function main() {
reverseProxyController.startWatching();
if (ENABLE_CRD_REFLECTION) {
console.log("CRD reflection enabled - will create CRDs to reflect database state");
logger.info("CRD reflection enabled - will create Custom Resources to mirror database state");
try {
await setupCRDRegistration(prisma, k8sClient, NAMESPACE);
logger.info("CRD registration completed successfully");
} catch (error: any) {
console.error(`Failed to set up CRD registration: ${error.message}`);
if (error.response) {
console.error(`Response status: ${error.response.statusCode}`);
console.error(`Response body: ${JSON.stringify(error.response.body)}`);
}
console.error("Continuing operation without CRD reflection");
console.log(
"Kubernetes resources will still be created/updated, but CRD reflection is disabled"
logger.error(
{
err: error,
message: error.message,
statusCode: error.response?.statusCode,
body: error.response?.body,
},
"Failed to setup CRD registration, continuing without CRD reflection"
);
logger.warn(
"Kubernetes resources (Deployments, Services) will still be created, but Custom Resources will not be reflected"
);
}
}
console.log("Minikura Kubernetes Operator is running");
logger.info("Minikura Kubernetes Operator is now running and watching for changes");
process.on("SIGINT", gracefulShutdown);
process.on("SIGTERM", gracefulShutdown);
function gracefulShutdown() {
console.log("Shutting down operator gracefully...");
logger.info("Received shutdown signal, shutting down gracefully");
serverController.stopWatching();
reverseProxyController.stopWatching();
prisma.$disconnect();
console.log("Resources released, exiting...");
logger.info("All resources released, exiting process");
process.exit(0);
}
} catch (error: any) {
console.error(`Failed to start Minikura Kubernetes Operator: ${error.message}`);
if (error.response) {
console.error(`Response status: ${error.response.statusCode}`);
console.error(`Response body: ${JSON.stringify(error.response.body)}`);
}
if (error.stack) {
console.error(`Stack trace: ${error.stack}`);
}
logger.fatal(
{
err: error,
message: error.message,
statusCode: error.response?.statusCode,
body: error.response?.body,
stack: error.stack,
},
"Failed to start Minikura Kubernetes Operator"
);
process.exit(1);
}
}
main().catch((error) => {
console.error("Unhandled error:", error);
logger.fatal({ err: error }, "Unhandled error in main process");
process.exit(1);
});

View File

@@ -1,8 +1,11 @@
import type * as k8s from "@kubernetes/client-node";
import type { ReverseProxyServerType } from "@minikura/db";
import { LABEL_PREFIX } from "../config/constants";
import { calculateJavaMemory, convertToK8sFormat } from "../utils/memory";
import { DEFAULT_PROXY_MEMORY, JAVA_MEMORY_FACTOR } from "../config/resource-defaults";
import type { ReverseProxyConfig } from "../types";
import { logger } from "../utils/logger";
import { calculateJavaMemory, convertToK8sFormat } from "../utils/memory";
import { mapServiceType } from "../utils/service-type";
export async function createReverseProxyServer(
server: ReverseProxyConfig,
@@ -11,7 +14,10 @@ export async function createReverseProxyServer(
_networkingApi: k8s.NetworkingV1Api,
namespace: string
): Promise<void> {
console.log(`Creating reverse proxy server ${server.id} in namespace '${namespace}'`);
logger.debug(
{ proxyId: server.id, proxyType: server.type, namespace },
"Creating reverse proxy server"
);
const serverType = server.type.toLowerCase();
const serverName = `${serverType}-${server.id}`;
@@ -35,11 +41,15 @@ export async function createReverseProxyServer(
try {
await coreApi.createNamespacedConfigMap({ namespace, body: configMap });
console.log(`Created ConfigMap for reverse proxy server ${server.id}`);
logger.debug({ proxyId: server.id, resource: "ConfigMap" }, "Created ConfigMap");
} catch (error: any) {
if (error.response?.statusCode === 409) {
await coreApi.replaceNamespacedConfigMap({ name: `${serverName}-config`, namespace, body: configMap });
console.log(`Updated ConfigMap for reverse proxy server ${server.id}`);
if (error.code === 409) {
await coreApi.replaceNamespacedConfigMap({
name: `${serverName}-config`,
namespace,
body: configMap,
});
logger.debug({ proxyId: server.id, resource: "ConfigMap" }, "Updated ConfigMap");
} else {
throw error;
}
@@ -69,17 +79,17 @@ export async function createReverseProxyServer(
name: "minecraft",
},
],
type: "LoadBalancer",
type: mapServiceType(server.service_type, "LoadBalancer"),
},
};
try {
await coreApi.createNamespacedService({ namespace, body: service });
console.log(`Created Service for reverse proxy server ${server.id}`);
logger.debug({ proxyId: server.id, resource: "Service" }, "Created Service");
} catch (error: any) {
if (error.response?.statusCode === 409) {
if (error.code === 409) {
await coreApi.replaceNamespacedService({ name: serverName, namespace, body: service });
console.log(`Updated Service for reverse proxy server ${server.id}`);
logger.debug({ proxyId: server.id, resource: "Service" }, "Updated Service");
} else {
throw error;
}
@@ -134,7 +144,10 @@ export async function createReverseProxyServer(
},
{
name: "MEMORY",
value: calculateJavaMemory(server.memory || "512M", 0.8),
value: calculateJavaMemory(
server.memory || DEFAULT_PROXY_MEMORY,
JAVA_MEMORY_FACTOR
),
},
...(server.env_variables || []).map((ev) => ({
name: ev.key,
@@ -150,11 +163,11 @@ export async function createReverseProxyServer(
},
resources: {
requests: {
memory: convertToK8sFormat(server.memory || "512M"),
memory: convertToK8sFormat(server.memory || DEFAULT_PROXY_MEMORY),
cpu: "250m",
},
limits: {
memory: convertToK8sFormat(server.memory || "512M"),
memory: convertToK8sFormat(server.memory || DEFAULT_PROXY_MEMORY),
cpu: "500m",
},
},
@@ -167,11 +180,11 @@ export async function createReverseProxyServer(
try {
await appsApi.createNamespacedDeployment({ namespace, body: deployment });
console.log(`Created Deployment for reverse proxy server ${server.id}`);
logger.debug({ proxyId: server.id, resource: "Deployment" }, "Created Deployment");
} catch (error: any) {
if (error.response?.statusCode === 409) {
if (error.code === 409) {
await appsApi.replaceNamespacedDeployment({ name: serverName, namespace, body: deployment });
console.log(`Updated Deployment for reverse proxy server ${server.id}`);
logger.debug({ proxyId: server.id, resource: "Deployment" }, "Updated Deployment");
} else {
throw error;
}
@@ -190,28 +203,28 @@ export async function deleteReverseProxyServer(
try {
await appsApi.deleteNamespacedDeployment({ name, namespace });
console.log(`Deleted Deployment for reverse proxy server ${proxyId}`);
logger.debug({ proxyId, resource: "Deployment" }, "Deleted Deployment");
} catch (error: any) {
if (error.response?.statusCode !== 404) {
console.error(`Error deleting Deployment for reverse proxy server ${proxyId}:`, error);
logger.error({ err: error, proxyId, resource: "Deployment" }, "Failed to delete Deployment");
}
}
try {
await coreApi.deleteNamespacedService({ name, namespace });
console.log(`Deleted Service for reverse proxy server ${proxyId}`);
logger.debug({ proxyId, resource: "Service" }, "Deleted Service");
} catch (error: any) {
if (error.response?.statusCode !== 404) {
console.error(`Error deleting Service for reverse proxy server ${proxyId}:`, error);
logger.error({ err: error, proxyId, resource: "Service" }, "Failed to delete Service");
}
}
try {
await coreApi.deleteNamespacedConfigMap({ name: `${name}-config`, namespace });
console.log(`Deleted ConfigMap for reverse proxy server ${proxyId}`);
logger.debug({ proxyId, resource: "ConfigMap" }, "Deleted ConfigMap");
} catch (error: any) {
if (error.response?.statusCode !== 404) {
console.error(`Error deleting ConfigMap for reverse proxy server ${proxyId}:`, error);
logger.error({ err: error, proxyId, resource: "ConfigMap" }, "Failed to delete ConfigMap");
}
}
}

View File

@@ -1,8 +1,11 @@
import type * as k8s from "@kubernetes/client-node";
import { ServerType } from "@minikura/db";
import { LABEL_PREFIX } from "../config/constants";
import { DEFAULT_SERVER_MEMORY, JAVA_MEMORY_FACTOR } from "../config/resource-defaults";
import type { ServerConfig } from "../types";
import { logger } from "../utils/logger";
import { calculateJavaMemory, convertToK8sFormat } from "../utils/memory";
import { mapServiceType } from "../utils/service-type";
export async function createServer(
server: ServerConfig,
@@ -33,15 +36,15 @@ export async function createServer(
try {
await coreApi.createNamespacedConfigMap({ namespace, body: configMap });
console.log(`Created ConfigMap for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "ConfigMap" }, "Created ConfigMap");
} catch (err: any) {
if (err.response?.statusCode === 409) {
if (err.code === 409) {
await coreApi.replaceNamespacedConfigMap({
name: `${serverName}-config`,
namespace,
body: configMap,
});
console.log(`Updated ConfigMap for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "ConfigMap" }, "Updated ConfigMap");
} else {
throw err;
}
@@ -71,17 +74,20 @@ export async function createServer(
name: "minecraft",
},
],
type: "ClusterIP",
type: mapServiceType(server.service_type),
},
};
try {
await coreApi.createNamespacedService({ namespace, body: service });
console.log(`Created Service for server ${server.id}`);
logger.debug(
{ serverId: server.id, resource: "Service", port: server.listen_port },
"Created Service"
);
} catch (err: any) {
if (err.response?.statusCode === 409) {
if (err.code === 409) {
await coreApi.replaceNamespacedService({ name: serverName, namespace, body: service });
console.log(`Updated Service for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "Service" }, "Updated Service");
} else {
throw err;
}
@@ -149,7 +155,10 @@ async function createDeployment(
},
{
name: "MEMORY",
value: calculateJavaMemory(server.memory || "1G", 0.8),
value: calculateJavaMemory(
server.memory || DEFAULT_SERVER_MEMORY,
JAVA_MEMORY_FACTOR
),
},
{
name: "OPS",
@@ -183,11 +192,11 @@ async function createDeployment(
},
resources: {
requests: {
memory: convertToK8sFormat(server.memory || "1G"),
memory: convertToK8sFormat(server.memory || DEFAULT_SERVER_MEMORY),
cpu: "250m",
},
limits: {
memory: convertToK8sFormat(server.memory || "1G"),
memory: convertToK8sFormat(server.memory || DEFAULT_SERVER_MEMORY),
cpu: "500m",
},
},
@@ -208,11 +217,11 @@ async function createDeployment(
try {
await appsApi.createNamespacedDeployment({ namespace, body: deployment });
console.log(`Created Deployment for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "Deployment" }, "Created Deployment");
} catch (err: any) {
if (err.response?.statusCode === 409) {
if (err.code === 409) {
await appsApi.replaceNamespacedDeployment({ name: serverName, namespace, body: deployment });
console.log(`Updated Deployment for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "Deployment" }, "Updated Deployment");
} else {
throw err;
}
@@ -275,7 +284,10 @@ async function createStatefulSet(
},
{
name: "MEMORY",
value: calculateJavaMemory(server.memory || "1G", 0.8),
value: calculateJavaMemory(
server.memory || DEFAULT_SERVER_MEMORY,
JAVA_MEMORY_FACTOR
),
},
{
name: "OPS",
@@ -353,15 +365,15 @@ async function createStatefulSet(
try {
await appsApi.createNamespacedStatefulSet({ namespace, body: statefulSet });
console.log(`Created StatefulSet for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "StatefulSet" }, "Created StatefulSet");
} catch (err: any) {
if (err.response?.statusCode === 409) {
if (err.code === 409) {
await appsApi.replaceNamespacedStatefulSet({
name: serverName,
namespace,
body: statefulSet,
});
console.log(`Updated StatefulSet for server ${server.id}`);
logger.debug({ serverId: server.id, resource: "StatefulSet" }, "Updated StatefulSet");
} else {
throw err;
}
@@ -378,37 +390,37 @@ export async function deleteServer(
try {
await appsApi.deleteNamespacedDeployment({ name: serverName, namespace });
console.log(`Deleted Deployment for server ${serverName}`);
logger.debug({ serverName, resource: "Deployment" }, "Deleted Deployment");
} catch (err: any) {
if (err.response?.statusCode !== 404) {
console.error(`Error deleting Deployment for server ${serverName}:`, err);
logger.error({ err, serverName, resource: "Deployment" }, "Failed to delete Deployment");
}
}
try {
await appsApi.deleteNamespacedStatefulSet({ name: serverName, namespace });
console.log(`Deleted StatefulSet for server ${serverName}`);
logger.debug({ serverName, resource: "StatefulSet" }, "Deleted StatefulSet");
} catch (err: any) {
if (err.response?.statusCode !== 404) {
console.error(`Error deleting StatefulSet for server ${serverName}:`, err);
logger.error({ err, serverName, resource: "StatefulSet" }, "Failed to delete StatefulSet");
}
}
try {
await coreApi.deleteNamespacedService({ name: serverName, namespace });
console.log(`Deleted Service for server ${serverName}`);
logger.debug({ serverName, resource: "Service" }, "Deleted Service");
} catch (err: any) {
if (err.response?.statusCode !== 404) {
console.error(`Error deleting Service for server ${serverName}:`, err);
logger.error({ err, serverName, resource: "Service" }, "Failed to delete Service");
}
}
try {
await coreApi.deleteNamespacedConfigMap({ name: `${serverName}-config`, namespace });
console.log(`Deleted ConfigMap for server ${serverName}`);
logger.debug({ serverName, resource: "ConfigMap" }, "Deleted ConfigMap");
} catch (err: any) {
if (err.response?.statusCode !== 404) {
console.error(`Error deleting ConfigMap for server ${serverName}:`, err);
logger.error({ err, serverName, resource: "ConfigMap" }, "Failed to delete ConfigMap");
}
}
}

View File

@@ -1,9 +1,9 @@
import { KubernetesClient } from "../utils/k8s-client";
import { registerRBACResources } from "../utils/rbac-registrar";
import { setupCRDRegistration } from "../utils/crd-registrar";
import { NAMESPACE } from "../config/constants";
import { PrismaClient } from "@minikura/db";
import { dotenvLoad } from "dotenv-mono";
import { NAMESPACE } from "../config/constants";
import { setupCRDRegistration } from "../utils/crd-registrar";
import { KubernetesClient } from "../utils/k8s-client";
import { registerRBACResources } from "../utils/rbac-registrar";
dotenvLoad();

View File

@@ -1,5 +1,8 @@
import { createLogger } from "@minikura/shared";
import pg from "pg";
const logger = createLogger("notification-service");
export class NotificationService {
private pgClient: pg.Client | null = null;
private handlers = new Map<string, Set<(payload: unknown) => void | Promise<void>>>();
@@ -9,7 +12,7 @@ export class NotificationService {
throw new Error("Database connection string is required");
}
console.log("\n[NotificationService] Connecting to PostgreSQL...");
logger.info("Connecting to PostgreSQL");
this.pgClient = new pg.Client({ connectionString });
await this.pgClient.connect();
@@ -19,27 +22,21 @@ export class NotificationService {
try {
const payload = msg.payload ? JSON.parse(msg.payload) : {};
console.log(
`\n[NotificationService] Received notification on channel '${msg.channel}':`,
payload
);
logger.info({ channel: msg.channel, payload }, "Received notification");
for (const handler of handlers) {
try {
await handler(payload);
} catch (err) {
console.error(
`[NotificationService] Error in handler for channel '${msg.channel}':`,
err
);
logger.error({ err, channel: msg.channel }, "Error in notification handler");
}
}
} catch (err) {
console.error(`[NotificationService] Failed to parse notification payload:`, err);
logger.error({ err }, "Failed to parse notification payload");
}
});
console.log("[NotificationService] Connected successfully");
logger.info("Connected to PostgreSQL successfully");
}
async listen(
@@ -53,10 +50,10 @@ export class NotificationService {
if (!this.handlers.has(channel)) {
this.handlers.set(channel, new Set());
await this.pgClient.query(`LISTEN ${channel}`);
console.log(`[NotificationService] Listening on channel: ${channel}`);
logger.info({ channel }, "Listening on channel");
}
this.handlers.get(channel)!.add(handler);
this.handlers.get(channel)?.add(handler);
}
async unlisten(channel: string): Promise<void> {
@@ -64,17 +61,17 @@ export class NotificationService {
this.handlers.delete(channel);
await this.pgClient.query(`UNLISTEN ${channel}`);
console.log(`[NotificationService] Stopped listening on channel: ${channel}`);
logger.info({ channel }, "Stopped listening on channel");
}
async disconnect(): Promise<void> {
if (!this.pgClient) return;
console.log("\n[NotificationService] Disconnecting...");
logger.info("Disconnecting from PostgreSQL");
await this.pgClient.end();
this.pgClient = null;
this.handlers.clear();
console.log("[NotificationService] Disconnected");
logger.info("Disconnected from PostgreSQL");
}
isConnected(): boolean {

View File

@@ -1,9 +1,7 @@
import type {
ServerType,
ReverseProxyServerType,
Server as PrismaServer,
ReverseProxyServer as PrismaReverseProxyServer,
CustomEnvironmentVariable,
ReverseProxyServer as PrismaReverseProxyServer,
Server as PrismaServer,
} from "@minikura/db";
// Base interface
@@ -21,7 +19,7 @@ export interface CustomResource {
export type ServerConfig = Pick<
PrismaServer,
"id" | "description" | "type" | "listen_port" | "memory"
"id" | "description" | "type" | "listen_port" | "memory" | "service_type"
> & {
apiKey: string;
env_variables?: Array<Pick<CustomEnvironmentVariable, "key" | "value">>;
@@ -51,7 +49,14 @@ export interface MinecraftServerCRD extends CustomResource {
export type ReverseProxyConfig = Pick<
PrismaReverseProxyServer,
"id" | "description" | "external_address" | "external_port" | "listen_port" | "type" | "memory"
| "id"
| "description"
| "external_address"
| "external_port"
| "listen_port"
| "type"
| "memory"
| "service_type"
> & {
apiKey: string;
env_variables?: Array<Pick<CustomEnvironmentVariable, "key" | "value">>;

View File

@@ -0,0 +1,25 @@
import type * as k8s from "@kubernetes/client-node";
export interface K8sApiError extends Error {
code?: number;
body?: string;
headers?: Record<string, string>;
}
export interface CustomResourceResponse<T = unknown> {
metadata?: k8s.V1ObjectMeta;
spec?: T;
status?: Record<string, unknown>;
body?: CustomResourceResponse<T>;
}
export interface CustomResourceListResponse<T = unknown> {
items?: CustomResourceResponse<T>[];
body?: {
items?: CustomResourceResponse<T>[];
};
}
export function isK8sApiError(error: unknown): error is K8sApiError {
return error instanceof Error && ("code" in error || "body" in error || "headers" in error);
}

View File

@@ -1,61 +1,134 @@
import type * as k8s from "@kubernetes/client-node";
import type { PrismaClient } from "@minikura/db";
import type { Server, ReverseProxyServer, CustomEnvironmentVariable } from "@minikura/db";
import type { KubernetesClient } from "./k8s-client";
import { API_GROUP, API_VERSION, LABEL_PREFIX } from "../config/constants";
import { MINECRAFT_SERVER_CRD } from "../crds/server";
import { REVERSE_PROXY_SERVER_CRD } from "../crds/reverseProxy";
import { MINECRAFT_SERVER_CRD } from "../crds/server";
import type { CustomResourceListResponse, CustomResourceResponse } from "../types/k8s-types";
import { isK8sApiError } from "../types/k8s-types";
import type { KubernetesClient } from "./k8s-client";
import { logger } from "./logger";
async function retryWithBackoff<T>(
operation: () => Promise<T>,
options: {
maxRetries?: number;
initialDelay?: number;
maxDelay?: number;
operationName?: string;
} = {}
): Promise<T> {
const {
maxRetries = 5,
initialDelay = 1000,
maxDelay = 10000,
operationName = "operation",
} = options;
let lastError: Error | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error: unknown) {
lastError = error as Error;
const is429 = isK8sApiError(error) && error.code === 429;
const isStorageInitializing =
isK8sApiError(error) && error.body?.includes("storage is (re)initializing");
if (!is429 && !isStorageInitializing) {
throw error;
}
if (attempt === maxRetries) {
logger.error({ operationName, maxRetries }, "Operation failed after max retries");
throw error;
}
let delay = initialDelay * 2 ** attempt;
if (isK8sApiError(error) && error.headers?.["retry-after"]) {
const retryAfter = parseInt(error.headers["retry-after"], 10);
if (!Number.isNaN(retryAfter)) {
delay = retryAfter * 1000;
}
}
delay = Math.min(delay, maxDelay);
const errorMessage = error instanceof Error ? error.message : String(error);
logger.warn(
{
operationName,
attempt: attempt + 1,
maxAttempts: maxRetries + 1,
delayMs: delay,
errorMessage,
},
"Operation failed, retrying with backoff"
);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
throw lastError;
}
/**
* Sets up the CRD registration and starts a reflector to sync database state to CRDs
*/
export async function setupCRDRegistration(
prisma: PrismaClient,
k8sClient: KubernetesClient,
namespace: string
): Promise<void> {
await registerCRDs(k8sClient);
logger.info("Waiting for Kubernetes storage to stabilize after CRD registration");
await new Promise((resolve) => setTimeout(resolve, 2000));
await startCRDReflector(prisma, k8sClient, namespace);
}
/**
* Registers the CRDs with the Kubernetes API
*/
async function registerCRDs(k8sClient: KubernetesClient): Promise<void> {
try {
const apiExtensionsClient = k8sClient.getApiExtensionsApi();
console.log("Registering CRDs...");
logger.info(
{ apiGroup: API_GROUP, apiVersion: API_VERSION },
"Registering Custom Resource Definitions"
);
try {
await apiExtensionsClient.createCustomResourceDefinition({ body: MINECRAFT_SERVER_CRD });
console.log(`MinecraftServer CRD created successfully (${API_GROUP}/${API_VERSION})`);
logger.info(
{ crd: "MinecraftServer", apiGroup: API_GROUP, apiVersion: API_VERSION },
"CRD created successfully"
);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log("MinecraftServer CRD already exists");
if (error.code === 409) {
logger.debug("MinecraftServer CRD already exists, skipping creation");
} else {
console.error("Error creating MinecraftServer CRD:", error);
logger.error({ err: error }, "Failed to create MinecraftServer CRD");
throw error;
}
}
try {
await apiExtensionsClient.createCustomResourceDefinition({ body: REVERSE_PROXY_SERVER_CRD });
console.log(`ReverseProxyServer CRD created successfully (${API_GROUP}/${API_VERSION})`);
logger.info(
{ crd: "ReverseProxyServer", apiGroup: API_GROUP, apiVersion: API_VERSION },
"CRD created successfully"
);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log("ReverseProxyServer CRD already exists");
if (error.code === 409) {
logger.debug("ReverseProxyServer CRD already exists, skipping creation");
} else {
console.error("Error creating ReverseProxyServer CRD:", error);
logger.error({ err: error }, "Failed to create ReverseProxyServer CRD");
throw error;
}
}
} catch (error) {
console.error("Error registering CRDs:", error);
logger.error({ err: error }, "Failed to register CRDs");
throw error;
}
}
/**
* Starts a reflector to sync database state to CRDs
*/
async function startCRDReflector(
prisma: PrismaClient,
k8sClient: KubernetesClient,
@@ -63,13 +136,11 @@ async function startCRDReflector(
): Promise<void> {
const customObjectsApi = k8sClient.getCustomObjectsApi();
// Keep track which server IDs have corresponding CRs
const reflectedMinecraftServers = new Map<string, string>(); // DB ID -> CR name
const reflectedReverseProxyServers = new Map<string, string>(); // DB ID -> CR name
const reflectedMinecraftServers = new Map<string, string>();
const reflectedReverseProxyServers = new Map<string, string>();
console.log("Starting CRD reflector...");
logger.info("Starting CRD reflector to sync database state to custom resources");
// Initial sync to create CRs that reflect the DB state
await syncDBtoCRDs(
prisma,
customObjectsApi,
@@ -78,8 +149,6 @@ async function startCRDReflector(
reflectedReverseProxyServers
);
// Polling interval to check for changes in the DB
// TODO: Make this listener instead
setInterval(async () => {
await syncDBtoCRDs(
prisma,
@@ -91,18 +160,15 @@ async function startCRDReflector(
}, 30 * 1000);
}
/**
* Synchronizes database state to CRDs
*/
async function syncDBtoCRDs(
prisma: PrismaClient,
customObjectsApi: any,
customObjectsApi: k8s.CustomObjectsApi,
namespace: string,
reflectedMinecraftServers: Map<string, string>,
reflectedReverseProxyServers: Map<string, string>
): Promise<void> {
try {
console.log(`[${new Date().toISOString()}] Starting CRD sync operation...`);
logger.debug("Starting CRD sync operation");
await syncMinecraftServers(prisma, customObjectsApi, namespace, reflectedMinecraftServers);
await syncReverseProxyServers(
prisma,
@@ -110,18 +176,15 @@ async function syncDBtoCRDs(
namespace,
reflectedReverseProxyServers
);
console.log(`[${new Date().toISOString()}] CRD sync operation completed`);
logger.debug("CRD sync operation completed successfully");
} catch (error) {
console.error(`[${new Date().toISOString()}] Error syncing database to CRDs:`, error);
logger.error({ err: error }, "Failed to sync database to CRDs");
}
}
/**
* Synchronizes Minecraft server objects from the database to CRDs
*/
async function syncMinecraftServers(
prisma: PrismaClient,
customObjectsApi: any,
customObjectsApi: k8s.CustomObjectsApi,
namespace: string,
reflectedMinecraftServers: Map<string, string>
): Promise<void> {
@@ -130,43 +193,48 @@ async function syncMinecraftServers(
let existingCRs: any[] = [];
try {
const response = await customObjectsApi.listNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
"minecraftservers"
const response = await retryWithBackoff(
() =>
customObjectsApi.listNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "minecraftservers",
}),
{
maxRetries: 5,
initialDelay: 1000,
operationName: "List MinecraftServer CRs",
}
);
existingCRs = (response.body as any).items || [];
const listResponse = response as unknown as CustomResourceListResponse;
existingCRs = listResponse.body?.items || listResponse.items || [];
} catch (error) {
console.error("Error listing MinecraftServer CRs:", error);
// TODO: Potentially better error handling here
// For now, continue anyway - it might just be that none exist yet
logger.error(
{ err: error },
"Failed to list MinecraftServer custom resources, assuming none exist"
);
existingCRs = [];
}
// Map CR names to their corresponding DB IDs
const existingCRMap = new Map<string, string>();
// Map of CR names to their resourceVersions for updates
const crResourceVersions = new Map<string, string>();
for (const cr of existingCRs) {
const internalId = cr.status?.internalId;
if (internalId) {
existingCRMap.set(internalId, cr.metadata.name);
// Store the resourceVersion for later
if (cr.metadata?.resourceVersion) {
crResourceVersions.set(cr.metadata.name, cr.metadata.resourceVersion);
}
}
}
// Refresh tracking map
reflectedMinecraftServers.clear();
// Create or update CRs for each server
for (const server of servers) {
const crName = existingCRMap.get(server.id) || `${server.id.toLowerCase()}`;
// Build the CR object
const serverCR: {
apiVersion: string;
kind: string;
@@ -194,97 +262,148 @@ async function syncMinecraftServers(
description: server.description,
listen_port: server.listen_port,
type: server.type,
memory: server.memory,
memory: `${server.memory}M`,
},
status: {
phase: "Running",
message: "Managed by database",
internalId: server.id,
apiKey: "[REDACTED]", // Don't expose actual API key
apiKey: "[REDACTED]",
lastSyncedAt: new Date().toISOString(),
},
};
try {
if (existingCRMap.has(server.id)) {
// Update existing CR
// Get the current resource first
const crName = existingCRMap.get(server.id)!;
const existingCRName = existingCRMap.get(server.id);
if (existingCRName) {
try {
// Get the existing resource to get the current resourceVersion
const existingResource = await customObjectsApi.getNamespacedCustomObject(
API_GROUP,
API_VERSION,
const existingResource = await customObjectsApi.getNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
"minecraftservers",
crName
);
plural: "minecraftservers",
name: existingCRName,
});
// Extract the resourceVersion from the existing resource
if (existingResource?.body?.metadata?.resourceVersion) {
// Add the resourceVersion to our custom resource
serverCR.metadata.resourceVersion = existingResource.body.metadata.resourceVersion;
const resourceResponse = existingResource as CustomResourceResponse;
const resource = resourceResponse.body || resourceResponse;
if (resource?.metadata?.resourceVersion) {
serverCR.metadata.resourceVersion = resource.metadata.resourceVersion;
}
// Now update with the correct resourceVersion
await customObjectsApi.replaceNamespacedCustomObject(
API_GROUP,
API_VERSION,
await customObjectsApi.replaceNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
"minecraftservers",
crName,
serverCR
plural: "minecraftservers",
name: existingCRName,
body: serverCR,
});
logger.debug(
{ crName: existingCRName, serverId: server.id },
"Updated MinecraftServer custom resource"
);
console.log(`Updated MinecraftServer CR ${crName} for server ${server.id}`);
} catch (error) {
console.error(`Error getting/updating MinecraftServer CR for ${server.id}:`, error);
logger.error(
{ err: error, serverId: server.id },
"Failed to get/update MinecraftServer custom resource"
);
}
} else {
// Create new CR
await customObjectsApi.createNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
"minecraftservers",
serverCR
);
console.log(`Created MinecraftServer CR ${crName} for server ${server.id}`);
try {
await customObjectsApi.createNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "minecraftservers",
body: serverCR,
});
logger.debug(
{ crName, serverId: server.id },
"Created MinecraftServer custom resource"
);
} catch (createError: any) {
if (createError.code === 409) {
logger.debug({ crName }, "MinecraftServer CR already exists, updating instead");
try {
const existingResource = await customObjectsApi.getNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "minecraftservers",
name: crName,
});
const resourceResponse = existingResource as CustomResourceResponse;
let resource = resourceResponse.body || resourceResponse;
if (!resource?.metadata && resourceResponse.metadata) {
resource = resourceResponse;
}
if (resource?.metadata?.resourceVersion) {
serverCR.metadata.resourceVersion = resource.metadata.resourceVersion;
await customObjectsApi.replaceNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "minecraftservers",
name: crName,
body: serverCR,
});
logger.debug(
{ crName, serverId: server.id },
"Updated existing MinecraftServer custom resource"
);
} else {
logger.error({ crName }, "Cannot update CR: no resourceVersion in response");
}
} catch (updateError) {
logger.error(
{ err: updateError, crName },
"Failed to update MinecraftServer custom resource"
);
throw updateError;
}
} else {
throw createError;
}
}
}
// Remember this mapping
reflectedMinecraftServers.set(server.id, crName);
} catch (error) {
console.error(`Error creating/updating MinecraftServer CR for ${server.id}:`, error);
logger.error(
{ err: error, serverId: server.id },
"Failed to create/update MinecraftServer custom resource"
);
}
}
// Delete CRs for servers that no longer exist
for (const [dbId, crName] of existingCRMap.entries()) {
if (!servers.some((s) => s.id === dbId)) {
try {
await customObjectsApi.deleteNamespacedCustomObject(
API_GROUP,
API_VERSION,
await customObjectsApi.deleteNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
"minecraftservers",
crName
plural: "minecraftservers",
name: crName,
});
logger.info(
{ crName, serverId: dbId },
"Deleted MinecraftServer CR for removed database record"
);
console.log(`Deleted MinecraftServer CR ${crName} for removed server ID ${dbId}`);
} catch (error) {
console.error(`Error deleting MinecraftServer CR ${crName}:`, error);
logger.error({ err: error, crName }, "Failed to delete MinecraftServer custom resource");
}
}
}
} catch (error) {
console.error("Error syncing Minecraft servers to CRDs:", error);
logger.error({ err: error }, "Failed to sync Minecraft servers to custom resources");
}
}
/**
* Synchronizes Reverse Proxy server objects from the database to CRDs
*/
async function syncReverseProxyServers(
prisma: PrismaClient,
customObjectsApi: any,
@@ -300,43 +419,48 @@ async function syncReverseProxyServers(
let existingCRs: any[] = [];
try {
const response = await customObjectsApi.listNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
"reverseproxyservers"
const response = await retryWithBackoff(
() =>
customObjectsApi.listNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "reverseproxyservers",
}),
{
maxRetries: 5,
initialDelay: 1000,
operationName: "List ReverseProxyServer CRs",
}
);
existingCRs = (response.body as any).items || [];
const listResponse = response as unknown as CustomResourceListResponse;
existingCRs = listResponse.body?.items || listResponse.items || [];
} catch (error) {
console.error("Error listing ReverseProxyServer CRs:", error);
// TODO: Potentially better error handling here
// For now, continue anyway - it might just be that none exist yet
logger.error(
{ err: error },
"Failed to list ReverseProxyServer custom resources, assuming none exist"
);
existingCRs = [];
}
// Map CR names to their corresponding DB IDs
const existingCRMap = new Map<string, string>();
// Map of CR names to their resourceVersions for updates
const crResourceVersions = new Map<string, string>();
for (const cr of existingCRs) {
const internalId = cr.status?.internalId;
if (internalId) {
existingCRMap.set(internalId, cr.metadata.name);
// Store the resourceVersion for later
if (cr.metadata?.resourceVersion) {
crResourceVersions.set(cr.metadata.name, cr.metadata.resourceVersion);
}
}
}
// Refresh tracking map
reflectedReverseProxyServers.clear();
// Create or update CRs for each proxy
for (const proxy of proxies) {
const crName = existingCRMap.get(proxy.id) || `${proxy.id.toLowerCase()}`;
// Build the CR object
const proxyCR: {
apiVersion: string;
kind: string;
@@ -366,7 +490,7 @@ async function syncReverseProxyServers(
external_port: proxy.external_port,
listen_port: proxy.listen_port,
type: proxy.type,
memory: proxy.memory,
memory: `${proxy.memory}M`,
environmentVariables: proxy.env_variables?.map((ev) => ({
key: ev.key,
value: ev.value,
@@ -376,84 +500,134 @@ async function syncReverseProxyServers(
phase: "Running",
message: "Managed by database",
internalId: proxy.id,
apiKey: "[REDACTED]", // Don't expose actual API key
apiKey: "[REDACTED]",
lastSyncedAt: new Date().toISOString(),
},
};
try {
if (existingCRMap.has(proxy.id)) {
// Update existing CR
// Get the current resource first
const crName = existingCRMap.get(proxy.id)!;
const existingCRName = existingCRMap.get(proxy.id);
if (existingCRName) {
try {
// Get the existing resource to get the current resourceVersion
const existingResource = await customObjectsApi.getNamespacedCustomObject(
API_GROUP,
API_VERSION,
const existingResource = await customObjectsApi.getNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
"reverseproxyservers",
crName
);
plural: "reverseproxyservers",
name: existingCRName,
});
// Extract the resourceVersion from the existing resource
if (existingResource?.body?.metadata?.resourceVersion) {
// Add the resourceVersion to our custom resource
proxyCR.metadata.resourceVersion = existingResource.body.metadata.resourceVersion;
const resourceResponse = existingResource as CustomResourceResponse;
const resource = resourceResponse.body || resourceResponse;
if (resource?.metadata?.resourceVersion) {
proxyCR.metadata.resourceVersion = resource.metadata.resourceVersion;
}
// Now update with the correct resourceVersion
await customObjectsApi.replaceNamespacedCustomObject(
API_GROUP,
API_VERSION,
await customObjectsApi.replaceNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
"reverseproxyservers",
crName,
proxyCR
plural: "reverseproxyservers",
name: existingCRName,
body: proxyCR,
});
logger.debug(
{ crName: existingCRName, proxyId: proxy.id },
"Updated ReverseProxyServer custom resource"
);
console.log(`Updated ReverseProxyServer CR ${crName} for proxy ${proxy.id}`);
} catch (error) {
console.error(`Error getting/updating ReverseProxyServer CR for ${proxy.id}:`, error);
logger.error(
{ err: error, proxyId: proxy.id },
"Failed to get/update ReverseProxyServer custom resource"
);
}
} else {
// Create new CR
await customObjectsApi.createNamespacedCustomObject(
API_GROUP,
API_VERSION,
namespace,
"reverseproxyservers",
proxyCR
);
console.log(`Created ReverseProxyServer CR ${crName} for proxy ${proxy.id}`);
try {
await customObjectsApi.createNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "reverseproxyservers",
body: proxyCR,
});
logger.debug(
{ crName, proxyId: proxy.id },
"Created ReverseProxyServer custom resource"
);
} catch (createError: any) {
if (createError.code === 409) {
logger.debug({ crName }, "ReverseProxyServer CR already exists, updating instead");
try {
const existingResource = await customObjectsApi.getNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "reverseproxyservers",
name: crName,
});
const resource = existingResource.body as any;
if (resource?.metadata?.resourceVersion) {
proxyCR.metadata.resourceVersion = resource.metadata.resourceVersion;
}
await customObjectsApi.replaceNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
plural: "reverseproxyservers",
name: crName,
body: proxyCR,
});
logger.debug(
{ crName, proxyId: proxy.id },
"Updated existing ReverseProxyServer custom resource"
);
} catch (updateError) {
logger.error(
{ err: updateError, crName },
"Failed to update ReverseProxyServer custom resource"
);
throw updateError;
}
} else {
throw createError;
}
}
}
// Remember this mapping
reflectedReverseProxyServers.set(proxy.id, crName);
} catch (error) {
console.error(`Error creating/updating ReverseProxyServer CR for ${proxy.id}:`, error);
logger.error(
{ err: error, proxyId: proxy.id },
"Failed to create/update ReverseProxyServer custom resource"
);
}
}
// Delete CRs for proxies that no longer exist
for (const [dbId, crName] of existingCRMap.entries()) {
if (!proxies.some((p) => p.id === dbId)) {
try {
await customObjectsApi.deleteNamespacedCustomObject(
API_GROUP,
API_VERSION,
await customObjectsApi.deleteNamespacedCustomObject({
group: API_GROUP,
version: API_VERSION,
namespace,
"reverseproxyservers",
crName
plural: "reverseproxyservers",
name: crName,
});
logger.info(
{ crName, proxyId: dbId },
"Deleted ReverseProxyServer CR for removed database record"
);
console.log(`Deleted ReverseProxyServer CR ${crName} for removed proxy ID ${dbId}`);
} catch (error) {
console.error(`Error deleting ReverseProxyServer CR ${crName}:`, error);
logger.error(
{ err: error, crName },
"Failed to delete ReverseProxyServer custom resource"
);
}
}
}
} catch (error) {
console.error("Error syncing Reverse Proxy servers to CRDs:", error);
logger.error({ err: error }, "Failed to sync reverse proxy servers to custom resources");
}
}

View File

@@ -1,9 +0,0 @@
export const getErrorMessage = (error: unknown): string => {
if (error instanceof Error) {
return error.message;
}
if (typeof error === "string") {
return error;
}
return "Unknown error";
};

View File

@@ -1,5 +1,6 @@
import * as k8s from "@kubernetes/client-node";
import { SKIP_TLS_VERIFY, NAMESPACE } from "../config/constants";
import { buildKubeConfig } from "@minikura/shared/kube-auth";
import { logger } from "./logger";
export class KubernetesClient {
private static instance: KubernetesClient;
@@ -11,13 +12,7 @@ export class KubernetesClient {
private apiExtensionsApi!: k8s.ApiextensionsV1Api;
private constructor() {
if (SKIP_TLS_VERIFY) {
console.log("Disabling TLS certificate validation");
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
}
this.kc = new k8s.KubeConfig();
this.setupConfig();
this.kc = buildKubeConfig();
this.initializeClients();
}
@@ -28,34 +23,6 @@ export class KubernetesClient {
return KubernetesClient.instance;
}
private setupConfig(): void {
try {
this.kc.loadFromDefault();
console.log("Loaded Kubernetes config from default location");
} catch (err) {
console.warn("Failed to load Kubernetes config from default location:", err);
}
// Running in a cluster, try to load in-cluster config
if (!this.kc.getCurrentContext()) {
try {
this.kc.loadFromCluster();
console.log("Loaded Kubernetes config from cluster");
} catch (err) {
console.warn("Failed to load Kubernetes config from cluster:", err);
}
}
if (!this.kc.getCurrentContext()) {
throw new Error("Failed to setup Kubernetes client - no valid configuration found");
}
const currentCluster = this.kc.getCurrentCluster();
if (currentCluster) {
console.log(`Connecting to Kubernetes server: ${currentCluster.server}`);
}
}
private initializeClients(): void {
this.appsApi = this.kc.makeApiClient(k8s.AppsV1Api);
this.coreApi = this.kc.makeApiClient(k8s.CoreV1Api);
@@ -89,12 +56,15 @@ export class KubernetesClient {
}
async handleApiError(error: any, context: string): Promise<never> {
console.error(`Kubernetes API error (${context}):`, error?.message || error);
if (error?.response) {
console.error(`Response status: ${error.response.statusCode}`);
console.error(`Response body: ${JSON.stringify(error.response.body)}`);
}
logger.error(
{
context,
message: error?.message,
statusCode: error?.response?.statusCode,
body: error?.response?.body,
},
"Kubernetes API error"
);
throw error;
}

View File

@@ -1,108 +0,0 @@
import { existsSync, readFileSync, writeFileSync } from "node:fs";
import { KubeConfig } from "@kubernetes/client-node";
import { spawnSync } from "bun";
import YAML from "yaml";
type KubeConfigDoc = {
users?: Array<{ name: string; user: { token?: string } }>;
contexts?: Array<{
name: string;
context: { cluster: string; user: string; namespace?: string };
}>;
clusters?: Array<{ name: string }>;
};
import { NAMESPACE } from "../config/constants";
const SA_NAME = process.env.K8S_SA_NAME || "minikura-operator";
const TOKEN_DURATION_HOURS = Number(process.env.K8S_TOKEN_DURATION_HOURS || 24);
const TOKEN_REFRESH_MIN = Number(process.env.K8S_TOKEN_REFRESH_MIN || 60);
function kubeconfigPath(): string {
return process.env.KUBECONFIG || `${process.env.HOME || process.env.USERPROFILE}/.kube/config`;
}
function refreshSaToken(): void {
const duration = `${TOKEN_DURATION_HOURS}h`;
const args = ["kubectl", "-n", NAMESPACE, "create", "token", SA_NAME, "--duration", duration];
if (process.env.KUBERNETES_SKIP_TLS_VERIFY === "true") {
args.push("--insecure-skip-tls-verify");
}
const proc = spawnSync(args);
if (proc.exitCode !== 0) {
console.error("[kube-auth] kubectl create token failed:", proc.stderr.toString());
return;
}
const token = proc.stdout.toString().trim();
const kcPath = kubeconfigPath();
if (!existsSync(kcPath)) {
console.error("[kube-auth] kubeconfig not found at:", kcPath);
return;
}
const doc = YAML.parse(readFileSync(kcPath, "utf8")) as KubeConfigDoc;
let user = doc.users?.find((existingUser) => existingUser.name === SA_NAME);
if (!user) {
user = { name: SA_NAME, user: {} };
if (!doc.users) doc.users = [];
doc.users.push(user);
}
user.user = { token };
let ctx = doc.contexts?.find((context) => context.name === "bun-local-operator");
if (!ctx) {
const clusterName = doc.clusters?.[0]?.name || "default";
ctx = {
name: "bun-local-operator",
context: {
cluster: clusterName,
user: SA_NAME,
namespace: NAMESPACE,
},
};
if (!doc.contexts) doc.contexts = [];
doc.contexts.push(ctx);
} else {
ctx.context.user = SA_NAME;
ctx.context.namespace = NAMESPACE;
}
writeFileSync(kcPath, YAML.stringify(doc));
console.log(
`[kube-auth] kubeconfig updated with fresh token for ${SA_NAME} (expires in ${duration})`
);
}
export function buildKubeConfig(): KubeConfig {
const kc = new KubeConfig();
const isInCluster =
process.env.KUBERNETES_SERVICE_HOST &&
existsSync("/var/run/secrets/kubernetes.io/serviceaccount/token");
if (isInCluster) {
console.log("[kube-auth] Running in-cluster, loading from service account");
kc.loadFromCluster();
return kc;
}
console.log("[kube-auth] Running locally, using ServiceAccount token auth");
refreshSaToken();
setInterval(refreshSaToken, TOKEN_REFRESH_MIN * 60_000);
kc.loadFromDefault();
try {
kc.setCurrentContext("bun-local-operator");
} catch (error) {
console.warn("[kube-auth] Could not set bun-local-operator context, using default");
}
return kc;
}

View File

@@ -0,0 +1,5 @@
import { createLogger } from "@minikura/shared";
export { createLogger };
export const logger = createLogger("k8s-operator");

View File

@@ -1,84 +1,80 @@
import type { KubernetesClient } from "./k8s-client";
import fetch from "node-fetch";
import {
minikuraNamespace,
minikuraServiceAccount,
minikuraClusterRole,
minikuraClusterRoleBinding,
minikuraNamespace,
minikuraOperatorDeployment,
minikuraServiceAccount,
} from "../crds/rbac";
import fetch from "node-fetch";
import type { KubernetesClient } from "./k8s-client";
import { logger } from "./logger";
/**
* Registers all RBAC resources required
* @param k8sClient The Kubernetes client instance
*/
export async function registerRBACResources(k8sClient: KubernetesClient): Promise<void> {
try {
console.log("Starting RBAC resources registration...");
logger.info("Starting RBAC resources registration");
await registerNamespace(k8sClient);
await registerServiceAccount(k8sClient);
await registerClusterRole(k8sClient);
await registerClusterRoleBinding(k8sClient);
console.log("RBAC resources registration completed successfully");
logger.info("RBAC resources registration completed successfully");
} catch (error: any) {
console.error("Error registering RBAC resources:", error.message);
logger.error(
{
err: error,
message: error.message,
statusCode: error.response?.statusCode,
body: error.response?.body,
},
"Error registering RBAC resources"
);
if (error.response) {
console.error(`Response status: ${error.response.statusCode}`);
console.error(`Response body: ${JSON.stringify(error.response.body)}`);
}
throw error;
}
}
/**
* Registers the namespace
*/
async function registerNamespace(k8sClient: KubernetesClient): Promise<void> {
try {
const coreApi = k8sClient.getCoreApi();
await coreApi.createNamespace({ body: minikuraNamespace });
console.log(`Created namespace ${minikuraNamespace.metadata.name}`);
logger.info({ namespace: minikuraNamespace.metadata.name }, "Created namespace");
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log(`Namespace ${minikuraNamespace.metadata.name} already exists`);
if (error.code === 409) {
logger.debug({ namespace: minikuraNamespace.metadata.name }, "Namespace already exists");
} else {
throw error;
}
}
}
/**
* Registers the service account
*/
async function registerServiceAccount(k8sClient: KubernetesClient): Promise<void> {
try {
const coreApi = k8sClient.getCoreApi();
await coreApi.createNamespacedServiceAccount({
namespace: minikuraServiceAccount.metadata.namespace,
body: minikuraServiceAccount
body: minikuraServiceAccount,
});
console.log(`Created service account ${minikuraServiceAccount.metadata.name}`);
logger.info(
{ serviceAccount: minikuraServiceAccount.metadata.name },
"Created service account"
);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log(`Service account ${minikuraServiceAccount.metadata.name} already exists`);
if (error.code === 409) {
logger.debug(`Service account ${minikuraServiceAccount.metadata.name} already exists`);
} else {
throw error;
}
}
}
/**
* Registers the cluster role
*/
async function registerClusterRole(k8sClient: KubernetesClient): Promise<void> {
try {
const kc = k8sClient.getKubeConfig();
const opts: any = {};
await kc.applyToHTTPSOptions(opts);
// Get cluster URL
const cluster = kc.getCurrentCluster();
if (!cluster) {
throw new Error("No active cluster found in KubeConfig");
@@ -99,9 +95,9 @@ async function registerClusterRole(k8sClient: KubernetesClient): Promise<void> {
);
if (response.ok) {
console.log(`Created cluster role ${minikuraClusterRole.metadata.name}`);
logger.debug(`Created cluster role ${minikuraClusterRole.metadata.name}`);
} else if (response.status === 409) {
console.log(`Cluster role ${minikuraClusterRole.metadata.name} already exists`);
logger.debug(`Cluster role ${minikuraClusterRole.metadata.name} already exists`);
} else {
const text = await response.text();
throw new Error(
@@ -109,35 +105,29 @@ async function registerClusterRole(k8sClient: KubernetesClient): Promise<void> {
);
}
} catch (error: any) {
// If the error message contains "already exists", that's OK
if (error.message?.includes("already exists") || error.message?.includes("409")) {
console.log(`Cluster role ${minikuraClusterRole.metadata.name} already exists`);
logger.debug(`Cluster role ${minikuraClusterRole.metadata.name} already exists`);
} else {
throw error;
}
}
} catch (error: any) {
console.error(`Error registering cluster role:`, error.message);
logger.error(`Error registering cluster role:`, error.message);
throw error;
}
}
/**
* Registers the Minikura cluster role binding
*/
async function registerClusterRoleBinding(k8sClient: KubernetesClient): Promise<void> {
try {
const kc = k8sClient.getKubeConfig();
const opts: any = {};
await kc.applyToHTTPSOptions(opts);
// Get cluster URL
const cluster = kc.getCurrentCluster();
if (!cluster) {
throw new Error("No active cluster found in KubeConfig");
}
// Create the cluster role binding
const { default: fetch } = await import("node-fetch");
try {
@@ -155,9 +145,9 @@ async function registerClusterRoleBinding(k8sClient: KubernetesClient): Promise<
);
if (response.ok) {
console.log(`Created cluster role binding ${minikuraClusterRoleBinding.metadata.name}`);
logger.debug(`Created cluster role binding ${minikuraClusterRoleBinding.metadata.name}`);
} else if (response.status === 409) {
console.log(
logger.debug(
`Cluster role binding ${minikuraClusterRoleBinding.metadata.name} already exists`
);
} else {
@@ -167,10 +157,8 @@ async function registerClusterRoleBinding(k8sClient: KubernetesClient): Promise<
);
}
} catch (error: any) {
// If the error message contains "already exists"
// TODO: Potentially better error handling here
if (error.message?.includes("already exists") || error.message?.includes("409")) {
console.log(
logger.debug(
`Cluster role binding ${minikuraClusterRoleBinding.metadata.name} already exists`
);
} else {
@@ -178,44 +166,36 @@ async function registerClusterRoleBinding(k8sClient: KubernetesClient): Promise<
}
}
} catch (error: any) {
console.error(`Error registering cluster role binding:`, error.message);
logger.error(`Error registering cluster role binding:`, error.message);
throw error;
}
}
/**
* Registers the Minikura operator deployment
* Note: This requires the secret to be created first
*/
export async function registerOperatorDeployment(
k8sClient: KubernetesClient,
registryUrl: string
): Promise<void> {
try {
// Replace the registry URL placeholder, for future use
const deployment = JSON.parse(
JSON.stringify(minikuraOperatorDeployment).replace("${REGISTRY_URL}", registryUrl)
);
const appsApi = k8sClient.getAppsApi();
await appsApi.createNamespacedDeployment(deployment.metadata.namespace, deployment);
console.log(`Created deployment ${deployment.metadata.name}`);
logger.debug(`Created deployment ${deployment.metadata.name}`);
} catch (error: any) {
if (error.response?.statusCode === 409) {
console.log(`Deployment ${minikuraOperatorDeployment.metadata.name} already exists`);
// Update the deployment if it already exists
if (error.code === 409) {
logger.debug(`Deployment ${minikuraOperatorDeployment.metadata.name} already exists`);
const deployment = JSON.parse(
JSON.stringify(minikuraOperatorDeployment).replace("${REGISTRY_URL}", registryUrl)
);
await k8sClient
.getAppsApi()
.replaceNamespacedDeployment({
name: deployment.metadata.name,
namespace: deployment.metadata.namespace,
body: deployment
});
console.log(`Updated deployment ${deployment.metadata.name}`);
await k8sClient.getAppsApi().replaceNamespacedDeployment({
name: deployment.metadata.name,
namespace: deployment.metadata.namespace,
body: deployment,
});
logger.debug(`Updated deployment ${deployment.metadata.name}`);
} else {
throw error;
}