mirror of
https://github.com/YuzuZensai/Minikura.git
synced 2026-01-31 14:57:49 +00:00
✨ feat: proxy transfer
This commit is contained in:
@@ -1,9 +1,10 @@
|
|||||||
package cafe.kirameki.minikuraVelocity
|
package cafe.kirameki.minikuraVelocity
|
||||||
|
|
||||||
import cafe.kirameki.minikuraVelocity.listeners.ServerConnectionHandler
|
import cafe.kirameki.minikuraVelocity.listeners.ProxyTransferHandler
|
||||||
import cafe.kirameki.minikuraVelocity.models.ReverseProxyServerData
|
import cafe.kirameki.minikuraVelocity.models.ReverseProxyServerData
|
||||||
import cafe.kirameki.minikuraVelocity.models.ServerData
|
import cafe.kirameki.minikuraVelocity.models.ServerData
|
||||||
import cafe.kirameki.minikuraVelocity.store.ServerDataStore
|
import cafe.kirameki.minikuraVelocity.store.ServerDataStore
|
||||||
|
import cafe.kirameki.minikuraVelocity.utils.ProxyTransferUtils
|
||||||
import cafe.kirameki.minikuraVelocity.utils.createWebSocketClient
|
import cafe.kirameki.minikuraVelocity.utils.createWebSocketClient
|
||||||
import com.google.gson.Gson
|
import com.google.gson.Gson
|
||||||
import com.google.gson.reflect.TypeToken
|
import com.google.gson.reflect.TypeToken
|
||||||
@@ -13,6 +14,7 @@ import com.velocitypowered.api.command.CommandMeta
|
|||||||
import com.velocitypowered.api.command.SimpleCommand
|
import com.velocitypowered.api.command.SimpleCommand
|
||||||
import com.velocitypowered.api.event.Subscribe
|
import com.velocitypowered.api.event.Subscribe
|
||||||
import com.velocitypowered.api.event.proxy.ProxyInitializeEvent
|
import com.velocitypowered.api.event.proxy.ProxyInitializeEvent
|
||||||
|
import com.velocitypowered.api.plugin.Dependency
|
||||||
import com.velocitypowered.api.plugin.Plugin
|
import com.velocitypowered.api.plugin.Plugin
|
||||||
import com.velocitypowered.api.proxy.ProxyServer
|
import com.velocitypowered.api.proxy.ProxyServer
|
||||||
import com.velocitypowered.api.proxy.server.RegisteredServer
|
import com.velocitypowered.api.proxy.server.RegisteredServer
|
||||||
@@ -23,39 +25,39 @@ import okhttp3.Request
|
|||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
|
||||||
@Plugin(id = "minikura-velocity", name = "MinikuraVelocity", version = "1.0")
|
|
||||||
|
@Plugin(
|
||||||
|
id = "minikura-velocity",
|
||||||
|
name = "MinikuraVelocity",
|
||||||
|
version = "1.0",
|
||||||
|
dependencies = [
|
||||||
|
Dependency(id = "redisbungee")
|
||||||
|
]
|
||||||
|
)
|
||||||
class Main @Inject constructor(private val logger: Logger, private val server: ProxyServer) {
|
class Main @Inject constructor(private val logger: Logger, private val server: ProxyServer) {
|
||||||
private val servers: MutableMap<String, RegisteredServer> = HashMap()
|
private val servers: MutableMap<String, RegisteredServer> = HashMap()
|
||||||
private val client = OkHttpClient()
|
private val client = OkHttpClient()
|
||||||
private val apiKey: String = System.getenv("MINIKURA_API_KEY") ?: ""
|
private val apiKey: String = System.getenv("MINIKURA_API_KEY") ?: ""
|
||||||
private val apiUrl: String = System.getenv("MINIKURA_API_URL") ?: "http://localhost:3000"
|
private val apiUrl: String = System.getenv("MINIKURA_API_URL") ?: "http://localhost:3000"
|
||||||
private val websocketUrl: String = System.getenv("MINIKURA_WEBSOCKET_URL") ?: "ws://localhost:3000/ws"
|
private val websocketUrl: String = System.getenv("MINIKURA_WEBSOCKET_URL") ?: "ws://localhost:3000/ws"
|
||||||
|
private var acceptingTransfers = AtomicBoolean(false)
|
||||||
|
|
||||||
@Subscribe
|
@Subscribe
|
||||||
fun onProxyInitialization(event: ProxyInitializeEvent?) {
|
fun onProxyInitialization(event: ProxyInitializeEvent?) {
|
||||||
logger.info("Minikura-Velocity is initializing...")
|
logger.info("Minikura-Velocity is initializing...")
|
||||||
|
|
||||||
val client = createWebSocketClient(this, server, websocketUrl)
|
ProxyTransferUtils.server = server
|
||||||
|
ProxyTransferUtils.plugin = this
|
||||||
|
ProxyTransferUtils.logger = logger
|
||||||
|
ProxyTransferUtils.acceptingTransfers = acceptingTransfers
|
||||||
|
|
||||||
|
val client = createWebSocketClient(this, logger, server, websocketUrl)
|
||||||
client.connect()
|
client.connect()
|
||||||
|
|
||||||
val commandManager: CommandManager = server.commandManager
|
val commandManager: CommandManager = server.commandManager
|
||||||
|
|
||||||
val serversCommandMeta: CommandMeta = commandManager.metaBuilder("servers")
|
|
||||||
.plugin(this)
|
|
||||||
.aliases("listservers", "serverlist")
|
|
||||||
.build()
|
|
||||||
|
|
||||||
val serversCommand = SimpleCommand { p ->
|
|
||||||
val source = p.source()
|
|
||||||
source.sendMessage(Component.text("Available servers:"))
|
|
||||||
for ((name, server) in servers) {
|
|
||||||
source.sendMessage(Component.text(" - $name (${server.serverInfo.address})"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
commandManager.register(serversCommandMeta, serversCommand)
|
|
||||||
|
|
||||||
val refreshCommandMeta: CommandMeta = commandManager.metaBuilder("refresh")
|
val refreshCommandMeta: CommandMeta = commandManager.metaBuilder("refresh")
|
||||||
.plugin(this)
|
.plugin(this)
|
||||||
.build()
|
.build()
|
||||||
@@ -63,12 +65,21 @@ class Main @Inject constructor(private val logger: Logger, private val server: P
|
|||||||
val refreshCommand = SimpleCommand { p ->
|
val refreshCommand = SimpleCommand { p ->
|
||||||
val source = p.source()
|
val source = p.source()
|
||||||
source.sendMessage(Component.text("Refreshing server list..."))
|
source.sendMessage(Component.text("Refreshing server list..."))
|
||||||
fetchServers()
|
|
||||||
fetchReverseProxyServers()
|
Executors.newSingleThreadExecutor().submit {
|
||||||
source.sendMessage(Component.text("Server list refreshed successfully!"))
|
fetchServers()
|
||||||
|
fetchReverseProxyServers()
|
||||||
|
source.sendMessage(Component.text("Server list refreshed successfully!"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
commandManager.register(refreshCommandMeta, refreshCommand)
|
val serversCommandMeta: CommandMeta = commandManager.metaBuilder("server")
|
||||||
|
.plugin(this)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
val endCommandMeta: CommandMeta = commandManager.metaBuilder("end")
|
||||||
|
.plugin(this)
|
||||||
|
.build()
|
||||||
|
|
||||||
val migrateCommandMeta: CommandMeta = commandManager.metaBuilder("migrate")
|
val migrateCommandMeta: CommandMeta = commandManager.metaBuilder("migrate")
|
||||||
.plugin(this)
|
.plugin(this)
|
||||||
@@ -91,17 +102,24 @@ class Main @Inject constructor(private val logger: Logger, private val server: P
|
|||||||
return@SimpleCommand
|
return@SimpleCommand
|
||||||
}
|
}
|
||||||
|
|
||||||
migratePlayersToServer(targetServer)
|
ProxyTransferUtils.migratePlayersToServer(targetServer)
|
||||||
source.sendMessage(Component.text("Migrating players to server '$targetServerName'..."))
|
source.sendMessage(Component.text("Migrating players to server '$targetServerName'..."))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
commandManager.register(serversCommandMeta, ServerCommand.createServerCommand(server))
|
||||||
|
commandManager.register(endCommandMeta, EndCommand.createEndCommand(server))
|
||||||
|
commandManager.register(refreshCommandMeta, refreshCommand)
|
||||||
commandManager.register(migrateCommandMeta, migrateCommand)
|
commandManager.register(migrateCommandMeta, migrateCommand)
|
||||||
|
|
||||||
val connectionHandler = ServerConnectionHandler(servers, logger)
|
val connectionHandler = ProxyTransferHandler(servers, logger)
|
||||||
server.eventManager.register(this, connectionHandler)
|
server.eventManager.register(this, connectionHandler)
|
||||||
|
|
||||||
fetchServers()
|
Executors.newSingleThreadExecutor().submit {
|
||||||
fetchReverseProxyServers()
|
fetchServers()
|
||||||
|
fetchReverseProxyServers()
|
||||||
|
acceptingTransfers.set(true)
|
||||||
|
logger.info("Ready to accept player new connections/proxy transfers.")
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("Minikura-Velocity has been initialized.")
|
logger.info("Minikura-Velocity has been initialized.")
|
||||||
}
|
}
|
||||||
@@ -112,25 +130,23 @@ class Main @Inject constructor(private val logger: Logger, private val server: P
|
|||||||
.header("Authorization", "Bearer $apiKey")
|
.header("Authorization", "Bearer $apiKey")
|
||||||
.build()
|
.build()
|
||||||
|
|
||||||
Executors.newSingleThreadExecutor().submit {
|
try {
|
||||||
try {
|
val response = client.newCall(request).execute()
|
||||||
val response = client.newCall(request).execute()
|
if (response.isSuccessful) {
|
||||||
if (response.isSuccessful) {
|
val responseBody = response.body?.string()
|
||||||
val responseBody = response.body?.string()
|
val fetchedServers = parseReverseProxyServersData(responseBody)
|
||||||
val fetchedServers = parseReverseProxyServersData(responseBody)
|
|
||||||
|
|
||||||
server.scheduler.buildTask(this, Runnable {
|
server.scheduler.buildTask(this, Runnable {
|
||||||
synchronized(ServerDataStore) {
|
synchronized(ServerDataStore) {
|
||||||
ServerDataStore.clearReverseProxyServers()
|
ServerDataStore.clearReverseProxyServers()
|
||||||
ServerDataStore.addAllReverseProxyServers(fetchedServers)
|
ServerDataStore.addAllReverseProxyServers(fetchedServers)
|
||||||
}
|
}
|
||||||
}).schedule()
|
}).schedule()
|
||||||
} else {
|
} else {
|
||||||
logger.error("Failed to fetch reverse proxy servers: ${response.message}")
|
logger.error("Failed to fetch reverse proxy servers: ${response.message}")
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.error("Error fetching reverse proxy servers: ${e.message}", e)
|
|
||||||
}
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
logger.error("Error fetching reverse proxy servers: ${e.message}", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -141,26 +157,24 @@ class Main @Inject constructor(private val logger: Logger, private val server: P
|
|||||||
.header("Authorization", "Bearer $apiKey")
|
.header("Authorization", "Bearer $apiKey")
|
||||||
.build()
|
.build()
|
||||||
|
|
||||||
Executors.newSingleThreadExecutor().submit {
|
try {
|
||||||
try {
|
val response = client.newCall(request).execute()
|
||||||
val response = client.newCall(request).execute()
|
if (response.isSuccessful) {
|
||||||
if (response.isSuccessful) {
|
val responseBody = response.body?.string()
|
||||||
val responseBody = response.body?.string()
|
val fetchedServers = parseServersData(responseBody)
|
||||||
val fetchedServers = parseServersData(responseBody)
|
|
||||||
|
|
||||||
server.scheduler.buildTask(this, Runnable {
|
server.scheduler.buildTask(this, Runnable {
|
||||||
synchronized(ServerDataStore) {
|
synchronized(ServerDataStore) {
|
||||||
ServerDataStore.clearServers()
|
ServerDataStore.clearServers()
|
||||||
ServerDataStore.addAllServers(fetchedServers)
|
ServerDataStore.addAllServers(fetchedServers)
|
||||||
}
|
}
|
||||||
populateServers(fetchedServers)
|
populateServers(fetchedServers)
|
||||||
}).schedule()
|
}).schedule()
|
||||||
} else {
|
} else {
|
||||||
logger.error("Failed to fetch servers: ${response.message}")
|
logger.error("Failed to fetch servers: ${response.message}")
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
|
||||||
logger.error("Error fetching servers: ${e.message}", e)
|
|
||||||
}
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
logger.error("Error fetching servers: ${e.message}", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,12 +206,4 @@ class Main @Inject constructor(private val logger: Logger, private val server: P
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun migratePlayersToServer(targetServer: ReverseProxyServerData) {
|
|
||||||
val targetAddress = InetSocketAddress(targetServer.address, targetServer.port)
|
|
||||||
|
|
||||||
server.allPlayers.forEach { player ->
|
|
||||||
player.transferToHost(targetAddress)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,130 @@
|
|||||||
|
package cafe.kirameki.minikuraVelocity.listeners
|
||||||
|
|
||||||
|
import cafe.kirameki.minikuraVelocity.store.ServerDataStore
|
||||||
|
import com.auth0.jwt.JWT
|
||||||
|
import com.auth0.jwt.algorithms.Algorithm
|
||||||
|
import com.imaginarycode.minecraft.redisbungee.RedisBungeeAPI
|
||||||
|
import com.velocitypowered.api.event.Subscribe
|
||||||
|
import com.velocitypowered.api.event.player.CookieReceiveEvent
|
||||||
|
import com.velocitypowered.api.event.player.PlayerChooseInitialServerEvent
|
||||||
|
import com.velocitypowered.api.proxy.server.RegisteredServer
|
||||||
|
import net.kyori.adventure.key.Key
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import java.util.*
|
||||||
|
import java.util.concurrent.CompletableFuture
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
import java.util.concurrent.TimeoutException
|
||||||
|
|
||||||
|
class ProxyTransferHandler(
|
||||||
|
private val servers: Map<String, RegisteredServer>,
|
||||||
|
private val logger: Logger
|
||||||
|
) {
|
||||||
|
private val cookieFutures = mutableMapOf<String, CompletableFuture<String>>()
|
||||||
|
private var jwtSecret = System.getenv("MINIKURA_JWT_SECRET") ?: "secret"
|
||||||
|
private val jwtAlgorithm = Algorithm.HMAC256(jwtSecret)
|
||||||
|
private val jwtVerifier = JWT.require(jwtAlgorithm).withIssuer("minikura").build()
|
||||||
|
private val redisBungeeApi = RedisBungeeAPI.getRedisBungeeApi()
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
fun onCookieReceiveEvent(event: CookieReceiveEvent) {
|
||||||
|
// TODO: Fix don't pass the cookie to backend, it kicks with invalid packet for some reason
|
||||||
|
if (event.originalKey == null || event.originalKey.toString() != "minikura:transfer_packet") {
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val future = cookieFutures[event.player.uniqueId.toString()]
|
||||||
|
if (event.originalData == null) {
|
||||||
|
future?.complete(null)
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
val stringData = String(event.originalData as ByteArray)
|
||||||
|
val decodedJwt = jwtVerifier.verify(stringData)
|
||||||
|
val server = decodedJwt.claims["server"]?.asString()
|
||||||
|
val uuid = decodedJwt.claims["uuid"]?.asString()
|
||||||
|
val origin = decodedJwt.claims["origin"]?.asString()
|
||||||
|
|
||||||
|
if (decodedJwt.expiresAt.before(Date())) {
|
||||||
|
future?.complete(null)
|
||||||
|
logger.warn("Received expired proxy transfer request for ${event.player.username}")
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uuid != event.player.uniqueId.toString()) {
|
||||||
|
future?.complete(null)
|
||||||
|
logger.warn("Received mismatched UUID proxy transfer request for ${event.player.username} (expected ${event.player.uniqueId}, got $uuid)")
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (server == null) {
|
||||||
|
future?.complete(null)
|
||||||
|
logger.warn("Received invalid proxy transfer request for ${event.player.username}")
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (origin != null && !redisBungeeApi.allProxies.contains(origin)) {
|
||||||
|
future?.complete(null)
|
||||||
|
logger.warn("Received invalid origin proxy transfer request for ${event.player.username}")
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
future?.complete(server)
|
||||||
|
logger.info("Accepted proxy transfer request $origin -> ${redisBungeeApi.proxyId} for ${event.player.username} -> $server")
|
||||||
|
event.result = CookieReceiveEvent.ForwardResult.handled()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
future?.complete(null)
|
||||||
|
logger.error("Error verifying proxy transfer request for ${event.player.username}: ${e.message}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Subscribe
|
||||||
|
fun onPlayerChooseInitialServer(event: PlayerChooseInitialServerEvent) {
|
||||||
|
val player = event.player
|
||||||
|
val currentServer = event.initialServer;
|
||||||
|
|
||||||
|
player.requestCookie(Key.key("minikura:transfer_packet"))
|
||||||
|
|
||||||
|
val future = CompletableFuture<String>()
|
||||||
|
cookieFutures[player.uniqueId.toString()] = future
|
||||||
|
|
||||||
|
// TODO: Check if player transferred with intent id of 3 (transfer)
|
||||||
|
// TODO: Can't seem to find a way to get the intent id from the event
|
||||||
|
try {
|
||||||
|
val serverName = future.get(3, TimeUnit.SECONDS)
|
||||||
|
|
||||||
|
if (serverName != null && servers.containsKey(serverName)) {
|
||||||
|
val targetServer = servers[serverName]
|
||||||
|
if (targetServer != null) {
|
||||||
|
event.setInitialServer(targetServer)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e: TimeoutException) {
|
||||||
|
logger.warn("Timeout checking for proxy transfer request for ${player.username}")
|
||||||
|
} finally {
|
||||||
|
cookieFutures.remove(player.uniqueId.toString())
|
||||||
|
}
|
||||||
|
|
||||||
|
val sortedServers = ServerDataStore.getServers()
|
||||||
|
.filter { it.join_priority != null }
|
||||||
|
.sortedBy { it.join_priority }
|
||||||
|
.mapNotNull { servers[it.name] }
|
||||||
|
|
||||||
|
for (server in sortedServers) {
|
||||||
|
if (server != currentServer) {
|
||||||
|
logger.info("Sending ${player.username} -> ${server.serverInfo.name}")
|
||||||
|
event.setInitialServer(server)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warn("No available servers with valid join_priority for ${player.username}")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,35 +0,0 @@
|
|||||||
package cafe.kirameki.minikuraVelocity.listeners
|
|
||||||
|
|
||||||
import cafe.kirameki.minikuraVelocity.store.ServerDataStore
|
|
||||||
import com.velocitypowered.api.event.Subscribe
|
|
||||||
import com.velocitypowered.api.event.player.PlayerChooseInitialServerEvent
|
|
||||||
import com.velocitypowered.api.proxy.server.RegisteredServer
|
|
||||||
import org.slf4j.Logger
|
|
||||||
|
|
||||||
class ServerConnectionHandler(
|
|
||||||
private val servers: Map<String, RegisteredServer>,
|
|
||||||
private val logger: Logger
|
|
||||||
) {
|
|
||||||
@Subscribe
|
|
||||||
fun onPlayerChooseInitialServer(event: PlayerChooseInitialServerEvent) {
|
|
||||||
val player = event.player
|
|
||||||
val currentServer = event.initialServer;
|
|
||||||
|
|
||||||
val sortedServers = ServerDataStore.getServers()
|
|
||||||
.filter { it.join_priority != null }
|
|
||||||
.sortedBy { it.join_priority }
|
|
||||||
.mapNotNull { servers[it.name] }
|
|
||||||
|
|
||||||
System.out.println("Sorted servers: ${sortedServers.map { it.serverInfo.name }}")
|
|
||||||
|
|
||||||
for (server in sortedServers) {
|
|
||||||
if (server != currentServer) {
|
|
||||||
logger.info("Attempting to connect ${player.username} to server ${server.serverInfo.name}")
|
|
||||||
event.setInitialServer(server)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.warn("No available servers with valid join_priority for ${player.username}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,121 @@
|
|||||||
|
package cafe.kirameki.minikuraVelocity.utils
|
||||||
|
|
||||||
|
import cafe.kirameki.minikuraVelocity.Main
|
||||||
|
import cafe.kirameki.minikuraVelocity.models.ReverseProxyServerData
|
||||||
|
import cafe.kirameki.minikuraVelocity.store.ServerDataStore
|
||||||
|
import com.auth0.jwt.JWT
|
||||||
|
import com.auth0.jwt.algorithms.Algorithm
|
||||||
|
import com.imaginarycode.minecraft.redisbungee.RedisBungeeAPI
|
||||||
|
import com.velocitypowered.api.proxy.ProxyServer
|
||||||
|
import com.velocitypowered.api.scheduler.ScheduledTask
|
||||||
|
import com.velocitypowered.api.scheduler.TaskStatus
|
||||||
|
import net.kyori.adventure.key.Key
|
||||||
|
import net.kyori.adventure.text.Component
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import java.net.InetSocketAddress
|
||||||
|
import java.util.*
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import java.util.function.Consumer
|
||||||
|
|
||||||
|
object ProxyTransferUtils {
|
||||||
|
private val redisBungeeApi = RedisBungeeAPI.getRedisBungeeApi()
|
||||||
|
private var jwtSecret = System.getenv("MINIKURA_JWT_SECRET") ?: "secret"
|
||||||
|
private val jwtAlgorithm = Algorithm.HMAC256(jwtSecret)
|
||||||
|
lateinit var server: ProxyServer
|
||||||
|
lateinit var plugin: Main
|
||||||
|
lateinit var logger: Logger
|
||||||
|
lateinit var acceptingTransfers: AtomicBoolean
|
||||||
|
|
||||||
|
fun migratePlayersToServer(targetServer: ReverseProxyServerData, disconnectFailed : Boolean = false): ScheduledTask {
|
||||||
|
val targetAddress = InetSocketAddress(targetServer.address, targetServer.port)
|
||||||
|
val currentProxyName = redisBungeeApi.proxyId
|
||||||
|
|
||||||
|
val players = server.allPlayers.toList()
|
||||||
|
val batchSize = (players.size * 0.05).coerceAtLeast(1.0).toInt() // 5% of players per batch to avoid overloading the server
|
||||||
|
|
||||||
|
var currentIndex = 0
|
||||||
|
|
||||||
|
for (player in players) {
|
||||||
|
player.sendMessage(Component.text("Proxy transfer in progress..."))
|
||||||
|
}
|
||||||
|
|
||||||
|
val taskId = server.scheduler.buildTask(plugin, Consumer { scheduledTask ->
|
||||||
|
if (currentIndex >= players.size) {
|
||||||
|
scheduledTask.cancel()
|
||||||
|
return@Consumer
|
||||||
|
}
|
||||||
|
|
||||||
|
val batch = players.subList(currentIndex, (currentIndex + batchSize).coerceAtMost(players.size))
|
||||||
|
currentIndex += batchSize
|
||||||
|
|
||||||
|
batch.forEach { player ->
|
||||||
|
val currentServer = player.currentServer.orElse(null)
|
||||||
|
try {
|
||||||
|
if (currentServer == null || !player.isActive) return@forEach
|
||||||
|
|
||||||
|
val token = JWT.create()
|
||||||
|
.withIssuer("minikura")
|
||||||
|
.withClaim("uuid", player.uniqueId.toString())
|
||||||
|
.withClaim("server", currentServer.serverInfo.name)
|
||||||
|
.withClaim("origin", currentProxyName)
|
||||||
|
.withExpiresAt(Date(System.currentTimeMillis() + 60 * 5 * 1000)) // Token expires in 5 minutes
|
||||||
|
.sign(jwtAlgorithm)
|
||||||
|
|
||||||
|
player.storeCookie(Key.key("minikura", "transfer_packet"), token.toByteArray())
|
||||||
|
player.transferToHost(targetAddress)
|
||||||
|
|
||||||
|
} catch (e: IllegalArgumentException) {
|
||||||
|
logger.error("Player ${player.username} client does not support transfers")
|
||||||
|
if (disconnectFailed) {
|
||||||
|
player.disconnect(Component.text("Failed to migrate you to the target server. Please reconnect."))
|
||||||
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
logger.error("Error migrating player ${player.username} to server ${targetServer.name}: ${e.message}", e)
|
||||||
|
if (disconnectFailed) {
|
||||||
|
player.disconnect(Component.text("Failed to migrate you to the target server. Please reconnect."))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.delay(3, TimeUnit.SECONDS)
|
||||||
|
.repeat(3, TimeUnit.SECONDS)
|
||||||
|
.schedule()
|
||||||
|
|
||||||
|
return taskId
|
||||||
|
}
|
||||||
|
|
||||||
|
fun endProxy() {
|
||||||
|
acceptingTransfers.set(false)
|
||||||
|
logger.info("Proxy is being shut down. No new connections or transfers will be accepted.")
|
||||||
|
|
||||||
|
val allProxies = redisBungeeApi.allProxies
|
||||||
|
.filter { it != redisBungeeApi.proxyId }
|
||||||
|
|
||||||
|
if (allProxies.isEmpty()) {
|
||||||
|
for (player in server.allPlayers) {
|
||||||
|
player.disconnect(Component.text("The proxy is being shut down. No other proxies are available."))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Load balance players across proxies
|
||||||
|
val nextProxy = allProxies.first()
|
||||||
|
val targetServer = ServerDataStore.getReverseProxyServer(nextProxy)
|
||||||
|
if (targetServer != null) {
|
||||||
|
val task = migratePlayersToServer(targetServer, true)
|
||||||
|
|
||||||
|
server.scheduler.buildTask(plugin, Runnable {
|
||||||
|
if (task.status() != TaskStatus.FINISHED && task.status() != TaskStatus.CANCELLED) {
|
||||||
|
return@Runnable
|
||||||
|
}
|
||||||
|
|
||||||
|
for (player in server.allPlayers) {
|
||||||
|
player.disconnect(Component.text("The proxy is being shut down. Please reconnect."))
|
||||||
|
}
|
||||||
|
|
||||||
|
server.shutdown()
|
||||||
|
}).repeat(5, TimeUnit.SECONDS).schedule()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user