diff --git a/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/Main.kt b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/Main.kt index 5ecf37e..8183b07 100644 --- a/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/Main.kt +++ b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/Main.kt @@ -1,9 +1,10 @@ package cafe.kirameki.minikuraVelocity -import cafe.kirameki.minikuraVelocity.listeners.ServerConnectionHandler +import cafe.kirameki.minikuraVelocity.listeners.ProxyTransferHandler import cafe.kirameki.minikuraVelocity.models.ReverseProxyServerData import cafe.kirameki.minikuraVelocity.models.ServerData import cafe.kirameki.minikuraVelocity.store.ServerDataStore +import cafe.kirameki.minikuraVelocity.utils.ProxyTransferUtils import cafe.kirameki.minikuraVelocity.utils.createWebSocketClient import com.google.gson.Gson import com.google.gson.reflect.TypeToken @@ -13,6 +14,7 @@ import com.velocitypowered.api.command.CommandMeta import com.velocitypowered.api.command.SimpleCommand import com.velocitypowered.api.event.Subscribe import com.velocitypowered.api.event.proxy.ProxyInitializeEvent +import com.velocitypowered.api.plugin.Dependency import com.velocitypowered.api.plugin.Plugin import com.velocitypowered.api.proxy.ProxyServer import com.velocitypowered.api.proxy.server.RegisteredServer @@ -23,39 +25,39 @@ import okhttp3.Request import org.slf4j.Logger import java.net.InetSocketAddress import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean -@Plugin(id = "minikura-velocity", name = "MinikuraVelocity", version = "1.0") + +@Plugin( + id = "minikura-velocity", + name = "MinikuraVelocity", + version = "1.0", + dependencies = [ + Dependency(id = "redisbungee") + ] +) class Main @Inject constructor(private val logger: Logger, private val server: ProxyServer) { private val servers: MutableMap = HashMap() private val client = OkHttpClient() private val apiKey: String = System.getenv("MINIKURA_API_KEY") ?: "" private val apiUrl: String = System.getenv("MINIKURA_API_URL") ?: "http://localhost:3000" private val websocketUrl: String = System.getenv("MINIKURA_WEBSOCKET_URL") ?: "ws://localhost:3000/ws" + private var acceptingTransfers = AtomicBoolean(false) @Subscribe fun onProxyInitialization(event: ProxyInitializeEvent?) { logger.info("Minikura-Velocity is initializing...") - val client = createWebSocketClient(this, server, websocketUrl) + ProxyTransferUtils.server = server + ProxyTransferUtils.plugin = this + ProxyTransferUtils.logger = logger + ProxyTransferUtils.acceptingTransfers = acceptingTransfers + + val client = createWebSocketClient(this, logger, server, websocketUrl) client.connect() val commandManager: CommandManager = server.commandManager - val serversCommandMeta: CommandMeta = commandManager.metaBuilder("servers") - .plugin(this) - .aliases("listservers", "serverlist") - .build() - - val serversCommand = SimpleCommand { p -> - val source = p.source() - source.sendMessage(Component.text("Available servers:")) - for ((name, server) in servers) { - source.sendMessage(Component.text(" - $name (${server.serverInfo.address})")) - } - } - - commandManager.register(serversCommandMeta, serversCommand) - val refreshCommandMeta: CommandMeta = commandManager.metaBuilder("refresh") .plugin(this) .build() @@ -63,12 +65,21 @@ class Main @Inject constructor(private val logger: Logger, private val server: P val refreshCommand = SimpleCommand { p -> val source = p.source() source.sendMessage(Component.text("Refreshing server list...")) - fetchServers() - fetchReverseProxyServers() - source.sendMessage(Component.text("Server list refreshed successfully!")) + + Executors.newSingleThreadExecutor().submit { + fetchServers() + fetchReverseProxyServers() + source.sendMessage(Component.text("Server list refreshed successfully!")) + } } - commandManager.register(refreshCommandMeta, refreshCommand) + val serversCommandMeta: CommandMeta = commandManager.metaBuilder("server") + .plugin(this) + .build() + + val endCommandMeta: CommandMeta = commandManager.metaBuilder("end") + .plugin(this) + .build() val migrateCommandMeta: CommandMeta = commandManager.metaBuilder("migrate") .plugin(this) @@ -91,17 +102,24 @@ class Main @Inject constructor(private val logger: Logger, private val server: P return@SimpleCommand } - migratePlayersToServer(targetServer) + ProxyTransferUtils.migratePlayersToServer(targetServer) source.sendMessage(Component.text("Migrating players to server '$targetServerName'...")) } + commandManager.register(serversCommandMeta, ServerCommand.createServerCommand(server)) + commandManager.register(endCommandMeta, EndCommand.createEndCommand(server)) + commandManager.register(refreshCommandMeta, refreshCommand) commandManager.register(migrateCommandMeta, migrateCommand) - val connectionHandler = ServerConnectionHandler(servers, logger) + val connectionHandler = ProxyTransferHandler(servers, logger) server.eventManager.register(this, connectionHandler) - fetchServers() - fetchReverseProxyServers() + Executors.newSingleThreadExecutor().submit { + fetchServers() + fetchReverseProxyServers() + acceptingTransfers.set(true) + logger.info("Ready to accept player new connections/proxy transfers.") + } logger.info("Minikura-Velocity has been initialized.") } @@ -112,25 +130,23 @@ class Main @Inject constructor(private val logger: Logger, private val server: P .header("Authorization", "Bearer $apiKey") .build() - Executors.newSingleThreadExecutor().submit { - try { - val response = client.newCall(request).execute() - if (response.isSuccessful) { - val responseBody = response.body?.string() - val fetchedServers = parseReverseProxyServersData(responseBody) + try { + val response = client.newCall(request).execute() + if (response.isSuccessful) { + val responseBody = response.body?.string() + val fetchedServers = parseReverseProxyServersData(responseBody) - server.scheduler.buildTask(this, Runnable { - synchronized(ServerDataStore) { - ServerDataStore.clearReverseProxyServers() - ServerDataStore.addAllReverseProxyServers(fetchedServers) - } - }).schedule() - } else { - logger.error("Failed to fetch reverse proxy servers: ${response.message}") - } - } catch (e: Exception) { - logger.error("Error fetching reverse proxy servers: ${e.message}", e) + server.scheduler.buildTask(this, Runnable { + synchronized(ServerDataStore) { + ServerDataStore.clearReverseProxyServers() + ServerDataStore.addAllReverseProxyServers(fetchedServers) + } + }).schedule() + } else { + logger.error("Failed to fetch reverse proxy servers: ${response.message}") } + } catch (e: Exception) { + logger.error("Error fetching reverse proxy servers: ${e.message}", e) } } @@ -141,26 +157,24 @@ class Main @Inject constructor(private val logger: Logger, private val server: P .header("Authorization", "Bearer $apiKey") .build() - Executors.newSingleThreadExecutor().submit { - try { - val response = client.newCall(request).execute() - if (response.isSuccessful) { - val responseBody = response.body?.string() - val fetchedServers = parseServersData(responseBody) + try { + val response = client.newCall(request).execute() + if (response.isSuccessful) { + val responseBody = response.body?.string() + val fetchedServers = parseServersData(responseBody) - server.scheduler.buildTask(this, Runnable { - synchronized(ServerDataStore) { - ServerDataStore.clearServers() - ServerDataStore.addAllServers(fetchedServers) - } - populateServers(fetchedServers) - }).schedule() - } else { - logger.error("Failed to fetch servers: ${response.message}") - } - } catch (e: Exception) { - logger.error("Error fetching servers: ${e.message}", e) + server.scheduler.buildTask(this, Runnable { + synchronized(ServerDataStore) { + ServerDataStore.clearServers() + ServerDataStore.addAllServers(fetchedServers) + } + populateServers(fetchedServers) + }).schedule() + } else { + logger.error("Failed to fetch servers: ${response.message}") } + } catch (e: Exception) { + logger.error("Error fetching servers: ${e.message}", e) } } @@ -192,12 +206,4 @@ class Main @Inject constructor(private val logger: Logger, private val server: P } } - private fun migratePlayersToServer(targetServer: ReverseProxyServerData) { - val targetAddress = InetSocketAddress(targetServer.address, targetServer.port) - - server.allPlayers.forEach { player -> - player.transferToHost(targetAddress) - } - } - } diff --git a/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/listeners/ProxyTransferHandler.kt b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/listeners/ProxyTransferHandler.kt new file mode 100644 index 0000000..462aead --- /dev/null +++ b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/listeners/ProxyTransferHandler.kt @@ -0,0 +1,130 @@ +package cafe.kirameki.minikuraVelocity.listeners + +import cafe.kirameki.minikuraVelocity.store.ServerDataStore +import com.auth0.jwt.JWT +import com.auth0.jwt.algorithms.Algorithm +import com.imaginarycode.minecraft.redisbungee.RedisBungeeAPI +import com.velocitypowered.api.event.Subscribe +import com.velocitypowered.api.event.player.CookieReceiveEvent +import com.velocitypowered.api.event.player.PlayerChooseInitialServerEvent +import com.velocitypowered.api.proxy.server.RegisteredServer +import net.kyori.adventure.key.Key +import org.slf4j.Logger +import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +class ProxyTransferHandler( + private val servers: Map, + private val logger: Logger +) { + private val cookieFutures = mutableMapOf>() + private var jwtSecret = System.getenv("MINIKURA_JWT_SECRET") ?: "secret" + private val jwtAlgorithm = Algorithm.HMAC256(jwtSecret) + private val jwtVerifier = JWT.require(jwtAlgorithm).withIssuer("minikura").build() + private val redisBungeeApi = RedisBungeeAPI.getRedisBungeeApi() + + @Subscribe + fun onCookieReceiveEvent(event: CookieReceiveEvent) { + // TODO: Fix don't pass the cookie to backend, it kicks with invalid packet for some reason + if (event.originalKey == null || event.originalKey.toString() != "minikura:transfer_packet") { + event.result = CookieReceiveEvent.ForwardResult.handled() + return + } + + val future = cookieFutures[event.player.uniqueId.toString()] + if (event.originalData == null) { + future?.complete(null) + event.result = CookieReceiveEvent.ForwardResult.handled() + return + } + + try { + val stringData = String(event.originalData as ByteArray) + val decodedJwt = jwtVerifier.verify(stringData) + val server = decodedJwt.claims["server"]?.asString() + val uuid = decodedJwt.claims["uuid"]?.asString() + val origin = decodedJwt.claims["origin"]?.asString() + + if (decodedJwt.expiresAt.before(Date())) { + future?.complete(null) + logger.warn("Received expired proxy transfer request for ${event.player.username}") + event.result = CookieReceiveEvent.ForwardResult.handled() + return + } + + if (uuid != event.player.uniqueId.toString()) { + future?.complete(null) + logger.warn("Received mismatched UUID proxy transfer request for ${event.player.username} (expected ${event.player.uniqueId}, got $uuid)") + event.result = CookieReceiveEvent.ForwardResult.handled() + return + } + + if (server == null) { + future?.complete(null) + logger.warn("Received invalid proxy transfer request for ${event.player.username}") + event.result = CookieReceiveEvent.ForwardResult.handled() + return + } + + if (origin != null && !redisBungeeApi.allProxies.contains(origin)) { + future?.complete(null) + logger.warn("Received invalid origin proxy transfer request for ${event.player.username}") + event.result = CookieReceiveEvent.ForwardResult.handled() + return + } + + future?.complete(server) + logger.info("Accepted proxy transfer request $origin -> ${redisBungeeApi.proxyId} for ${event.player.username} -> $server") + event.result = CookieReceiveEvent.ForwardResult.handled() + } catch (e: Exception) { + future?.complete(null) + logger.error("Error verifying proxy transfer request for ${event.player.username}: ${e.message}") + } + } + + @Subscribe + fun onPlayerChooseInitialServer(event: PlayerChooseInitialServerEvent) { + val player = event.player + val currentServer = event.initialServer; + + player.requestCookie(Key.key("minikura:transfer_packet")) + + val future = CompletableFuture() + cookieFutures[player.uniqueId.toString()] = future + + // TODO: Check if player transferred with intent id of 3 (transfer) + // TODO: Can't seem to find a way to get the intent id from the event + try { + val serverName = future.get(3, TimeUnit.SECONDS) + + if (serverName != null && servers.containsKey(serverName)) { + val targetServer = servers[serverName] + if (targetServer != null) { + event.setInitialServer(targetServer) + return + } + } + } catch (e: TimeoutException) { + logger.warn("Timeout checking for proxy transfer request for ${player.username}") + } finally { + cookieFutures.remove(player.uniqueId.toString()) + } + + val sortedServers = ServerDataStore.getServers() + .filter { it.join_priority != null } + .sortedBy { it.join_priority } + .mapNotNull { servers[it.name] } + + for (server in sortedServers) { + if (server != currentServer) { + logger.info("Sending ${player.username} -> ${server.serverInfo.name}") + event.setInitialServer(server) + return + } + } + + logger.warn("No available servers with valid join_priority for ${player.username}") + } +} \ No newline at end of file diff --git a/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/listeners/ServerConnectionHandler.kt b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/listeners/ServerConnectionHandler.kt deleted file mode 100644 index 2ae9505..0000000 --- a/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/listeners/ServerConnectionHandler.kt +++ /dev/null @@ -1,35 +0,0 @@ -package cafe.kirameki.minikuraVelocity.listeners - -import cafe.kirameki.minikuraVelocity.store.ServerDataStore -import com.velocitypowered.api.event.Subscribe -import com.velocitypowered.api.event.player.PlayerChooseInitialServerEvent -import com.velocitypowered.api.proxy.server.RegisteredServer -import org.slf4j.Logger - -class ServerConnectionHandler( - private val servers: Map, - private val logger: Logger -) { - @Subscribe - fun onPlayerChooseInitialServer(event: PlayerChooseInitialServerEvent) { - val player = event.player - val currentServer = event.initialServer; - - val sortedServers = ServerDataStore.getServers() - .filter { it.join_priority != null } - .sortedBy { it.join_priority } - .mapNotNull { servers[it.name] } - - System.out.println("Sorted servers: ${sortedServers.map { it.serverInfo.name }}") - - for (server in sortedServers) { - if (server != currentServer) { - logger.info("Attempting to connect ${player.username} to server ${server.serverInfo.name}") - event.setInitialServer(server) - return - } - } - - logger.warn("No available servers with valid join_priority for ${player.username}") - } -} \ No newline at end of file diff --git a/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/utils/ProxyTransferUtils.kt b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/utils/ProxyTransferUtils.kt new file mode 100644 index 0000000..62cdc0e --- /dev/null +++ b/plugins/MinikuraVelocity/src/main/kotlin/cafe/kirameki/minikuraVelocity/utils/ProxyTransferUtils.kt @@ -0,0 +1,121 @@ +package cafe.kirameki.minikuraVelocity.utils + +import cafe.kirameki.minikuraVelocity.Main +import cafe.kirameki.minikuraVelocity.models.ReverseProxyServerData +import cafe.kirameki.minikuraVelocity.store.ServerDataStore +import com.auth0.jwt.JWT +import com.auth0.jwt.algorithms.Algorithm +import com.imaginarycode.minecraft.redisbungee.RedisBungeeAPI +import com.velocitypowered.api.proxy.ProxyServer +import com.velocitypowered.api.scheduler.ScheduledTask +import com.velocitypowered.api.scheduler.TaskStatus +import net.kyori.adventure.key.Key +import net.kyori.adventure.text.Component +import org.slf4j.Logger +import java.net.InetSocketAddress +import java.util.* +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer + +object ProxyTransferUtils { + private val redisBungeeApi = RedisBungeeAPI.getRedisBungeeApi() + private var jwtSecret = System.getenv("MINIKURA_JWT_SECRET") ?: "secret" + private val jwtAlgorithm = Algorithm.HMAC256(jwtSecret) + lateinit var server: ProxyServer + lateinit var plugin: Main + lateinit var logger: Logger + lateinit var acceptingTransfers: AtomicBoolean + + fun migratePlayersToServer(targetServer: ReverseProxyServerData, disconnectFailed : Boolean = false): ScheduledTask { + val targetAddress = InetSocketAddress(targetServer.address, targetServer.port) + val currentProxyName = redisBungeeApi.proxyId + + val players = server.allPlayers.toList() + val batchSize = (players.size * 0.05).coerceAtLeast(1.0).toInt() // 5% of players per batch to avoid overloading the server + + var currentIndex = 0 + + for (player in players) { + player.sendMessage(Component.text("Proxy transfer in progress...")) + } + + val taskId = server.scheduler.buildTask(plugin, Consumer { scheduledTask -> + if (currentIndex >= players.size) { + scheduledTask.cancel() + return@Consumer + } + + val batch = players.subList(currentIndex, (currentIndex + batchSize).coerceAtMost(players.size)) + currentIndex += batchSize + + batch.forEach { player -> + val currentServer = player.currentServer.orElse(null) + try { + if (currentServer == null || !player.isActive) return@forEach + + val token = JWT.create() + .withIssuer("minikura") + .withClaim("uuid", player.uniqueId.toString()) + .withClaim("server", currentServer.serverInfo.name) + .withClaim("origin", currentProxyName) + .withExpiresAt(Date(System.currentTimeMillis() + 60 * 5 * 1000)) // Token expires in 5 minutes + .sign(jwtAlgorithm) + + player.storeCookie(Key.key("minikura", "transfer_packet"), token.toByteArray()) + player.transferToHost(targetAddress) + + } catch (e: IllegalArgumentException) { + logger.error("Player ${player.username} client does not support transfers") + if (disconnectFailed) { + player.disconnect(Component.text("Failed to migrate you to the target server. Please reconnect.")) + } + } catch (e: Exception) { + logger.error("Error migrating player ${player.username} to server ${targetServer.name}: ${e.message}", e) + if (disconnectFailed) { + player.disconnect(Component.text("Failed to migrate you to the target server. Please reconnect.")) + } + } + } + }) + .delay(3, TimeUnit.SECONDS) + .repeat(3, TimeUnit.SECONDS) + .schedule() + + return taskId + } + + fun endProxy() { + acceptingTransfers.set(false) + logger.info("Proxy is being shut down. No new connections or transfers will be accepted.") + + val allProxies = redisBungeeApi.allProxies + .filter { it != redisBungeeApi.proxyId } + + if (allProxies.isEmpty()) { + for (player in server.allPlayers) { + player.disconnect(Component.text("The proxy is being shut down. No other proxies are available.")) + } + return + } + + // TODO: Load balance players across proxies + val nextProxy = allProxies.first() + val targetServer = ServerDataStore.getReverseProxyServer(nextProxy) + if (targetServer != null) { + val task = migratePlayersToServer(targetServer, true) + + server.scheduler.buildTask(plugin, Runnable { + if (task.status() != TaskStatus.FINISHED && task.status() != TaskStatus.CANCELLED) { + return@Runnable + } + + for (player in server.allPlayers) { + player.disconnect(Component.text("The proxy is being shut down. Please reconnect.")) + } + + server.shutdown() + }).repeat(5, TimeUnit.SECONDS).schedule() + } + } +} \ No newline at end of file