Blog

undertow と graceful shutdown と

undertow を利用する予定は特にないのだが、undertow だと graceful shutdown はどのように実現可能なのだろうか、ということが気になったので調べてみました。 ここでいう graceful shutdown は listen socket を close したうえで、処理をすべて正常に終了し、終了後にプロセスを exit するようなものを指しています。

いくつかハマりどころがあるので注意。

undertow はドキュメントが貧弱

どうも、貧弱ですね。。ソース読めないと厳しい。ソースか javadoc 眺めて探すとかしないと見つからない。利用方法ものってないからテストから探すとかしないといけない。

DeploymentManager.stop() を呼ばないと Servlet#destroy が呼ばれない

これは、Java EE になれた人だと常識なんだろうけど、DeploymentManager.stop() を明示的に呼ばないと Servlet#destroy が呼ばれないんでクリーンアップ処理が正常に処理されません。shutdown まわりの正しい呼び出し手順が undertow のマニュアルには載ってないので割と困る。

frsyuki++ に教えてもらいました。

GracefulShutdownHandler を利用することで graceful shutdown が可能。

可能なのだが、ドキュメントがないのでテストコードとソースコードから把握する必要がある。

まず、GracefulShtudownHandler#shutdown を呼ぶと shutdown 状態になります。shutdown 状態になるとすべてのリクエストに 503 を返すようになります。 GracefulShutdownHandler#awaitShutdown を呼ぶと、すべてのリクエストが終了するまで待ってくれます。すべてのリクエストが止まったあとにサーバーを止めればOK、ということっぽいです。

しかし、HTTP レベルでのレスポンスを返してしまうので load balancer になんとなくバランスさせるとかは難しい。明示的に load balancer から外すようにしないとダメでしょう(そして、明示的に外すのは nginx だとちょっと面倒なことしないとできない)。health check モジュール入れて、health check の結果に false を返したあとで GracefulShutdownHandler#shutdown 呼んで、、とかすればできる。

ちょっと頑張れば graceful shutdown できる

という順番で閉じていけばOK。

Undertow.stop() は AcceptingChannel を close して worker を shutdown するところまで一気にやってしまうので使えない。 もともと Undertow クラスは、便利クラスってだけなので無理して利用しなくても良い。

サンプルコードは以下。

package com.example;

import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.UndertowOptions;
import io.undertow.connector.ByteBufferPool;
import io.undertow.server.DefaultByteBufferPool;
import io.undertow.server.handlers.GracefulShutdownHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.server.protocol.http.HttpOpenListener;
import io.undertow.servlet.Servlets;
import io.undertow.servlet.api.DeploymentInfo;
import io.undertow.servlet.api.DeploymentManager;
import io.undertow.servlet.api.InstanceHandle;
import lombok.extern.slf4j.Slf4j;
import org.xnio.*;
import org.xnio.channels.AcceptingChannel;

import javax.servlet.AsyncContext;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class UndertowApp {
    public static void main(String[] args) throws ServletException, InterruptedException, IOException {
        new UndertowApp().run();
    }

    private void run() throws ServletException, InterruptedException, IOException {
        CountDownLatch latch = new CountDownLatch(1);

        DeploymentInfo servletBuilder = Servlets.deployment()
                .setClassLoader(UndertowApp.class.getClassLoader())
                .setContextPath("/")
                .setDeploymentName("async.war")
                .addServlets(
                        Servlets.servlet("MessageServlet", AsyncResponseServlet.class, () -> new InstanceHandle<Servlet>() {
                            @Override
                            public Servlet getInstance() {
                                log.info("Getting instance");
                                return new AsyncResponseServlet(latch);
                            }

                            @Override
                            public void release() {
                                /* nop */
                            }
                        })
                                .setAsyncSupported(true)
                                .addInitParam("message", "Hello World")
                                .addMapping("/*")
                );

        DeploymentManager manager = Servlets.defaultContainer()
                .addDeployment(servletBuilder);
        manager.deploy();
        PathHandler path = Handlers
                .path()
                .addExactPath("/", manager.start());

        int ioThreads = Math.max(Runtime.getRuntime().availableProcessors(), 2);
        int workerThreads = ioThreads * 8;

        Xnio xnio = Xnio.getInstance(Undertow.class.getClassLoader());

        XnioWorker worker = xnio.createWorker(OptionMap.builder()
                .set(Options.WORKER_IO_THREADS, ioThreads)
                .set(Options.CONNECTION_HIGH_WATER, 1000000)
                .set(Options.CONNECTION_LOW_WATER, 1000000)
                .set(Options.WORKER_TASK_CORE_THREADS, workerThreads)
                .set(Options.WORKER_TASK_MAX_THREADS, workerThreads)
                .set(Options.TCP_NODELAY, true)
                .set(Options.CORK, true)
                .getMap());

        OptionMap socketOptions = OptionMap.builder()
                .set(Options.WORKER_IO_THREADS, ioThreads)
                .set(Options.TCP_NODELAY, true)
                .set(Options.REUSE_ADDRESSES, true)
                .set(Options.BALANCING_TOKENS, 1)
                .set(Options.BALANCING_CONNECTIONS, 2)
                .set(Options.BACKLOG, 1000)
                .getMap();

        OptionMap serverOptions = OptionMap.builder()
                .set(UndertowOptions.NO_REQUEST_TIMEOUT, 60000000)
                .getMap();

        ByteBufferPool buffers = new DefaultByteBufferPool(true, 1024 * 16, -1, 4);

        GracefulShutdownHandler gracefulShutdownHandler = Handlers.gracefulShutdown(path);

        OptionMap undertowOptions = OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).addAll(serverOptions).getMap();
        HttpOpenListener openListener = new HttpOpenListener(buffers, undertowOptions);
        openListener.setRootHandler(gracefulShutdownHandler);
        ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
        AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName("localhost"), 18080), acceptListener, socketOptions);
        server.resumeAccepts();

        log.info("Send request");
        Thread clientThread = new Thread(this::startClient);
        clientThread.setName("http client");
        clientThread.start();

        log.info("Waiting request");
        latch.await();

        log.info("Stopping listening channel");
        IoUtils.safeClose(server);

        log.info("Entering shutdown state");
        gracefulShutdownHandler.shutdown();

        log.info("Await all requests(7sec)");
        gracefulShutdownHandler.awaitShutdown(7 * 1000);

        log.info("Shutdown workers");
        worker.shutdown();

        log.info("Stopped");

        manager.stop();
        manager.undeploy();

        log.info("Undeployed");

        log.info("joining client thread");
        clientThread.join();
    }

    private void startClient() {
        try (Socket clientSocket = new Socket("localhost", 18080);
             DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream())
        ) {
            outToServer.write("GET / HTTP/1.0\015\012Content-Length: 0\015\012\015\012".getBytes(StandardCharsets.UTF_8));
            log.info("Sent request");
            clientSocket.shutdownOutput();

            StringBuilder builder = new StringBuilder();
            while (true) {
                byte[] buf = new byte[1024];
                int read = clientSocket.getInputStream().read(buf);
                if (read == -1) {
                    log.info("Got response: {}, {},{},{}, {}",
                            builder.toString(),
                            clientSocket.isConnected(),
                            clientSocket.isBound(),
                            clientSocket.isInputShutdown(),
                            clientSocket.isClosed());
                    break;
                }
                builder.append(new String(buf, 0, read)).append("\\n");
            }
        } catch (IOException e) {
            log.error("IOException", e);
            throw new UncheckedIOException(e);
        }
    }

    @Slf4j
    public static class AsyncResponseServlet extends HttpServlet {
        private final ExecutorService pool = Executors.newFixedThreadPool(10);
        private final CountDownLatch latch;

        public AsyncResponseServlet(CountDownLatch latch) {
            this.latch = latch;
        }


        @Override
        protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            AsyncResponseServlet.log.info("Got request: {}", req.getPathInfo());
            this.latch.countDown();

            AsyncContext asyncContext = req.startAsync();

            pool.submit(() -> {
                AsyncResponseServlet.log.info("Sleeping...");
                try {
                    Thread.sleep(3L * 1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }

                AsyncResponseServlet.log.info("Sending response");
                resp.setStatus(200);
                try {
                    resp.getWriter().print("OK\n");
                } catch (IOException e) {
                    AsyncResponseServlet.log.error("Can't send response", e);
                } finally {
                    AsyncResponseServlet.log.info("complete async thread");
                    asyncContext.complete();
                }
            });
        }

        @Override
        public void destroy() {
            AsyncResponseServlet.log.info("Shutdown servlet");
            this.pool.shutdown();
        }
    }
}