使用 Tomcat 7 與 Java Servlet 3.0 API 實作 Asynchronous Servlets:提升伺服器效率的方案

在 Java Servlet 3.0 標準推出之前,如果想要實作 asynchronous 的 servlet 必須使用像 Comet 這樣的架構,而現在 Servlet 3.0 API 直接支援 asynchronous 與 synchronous 兩種模式,而因為這是公開的標準,所以寫好的 Servlet 可以很方便的移植到各種符合 Servlet 3.0 的 app server 中(例如 Tomcat 7 或是 GlassFish 3.x)。

在使用 synchronous servlets 時,處理 client HTTP request 的執行序(thread)會跟處理 request 的過程綁在一起,而在處理工作量比較大、等待時間比較久的 request 時(通常在等待外部的 IO 或是其他耗時的動作),該執行序就只能等待工作完成,才能執行下一步,這樣的狀況如果在 request 數量很多的時候,就有可能會造成伺服器的執行序不足的問題,進而影響伺服器整體的效能。

這種情況主要是由於伺服器上的執行序都在等待外部的工作完成(其實這時候它們都在休息),但是從 client 來的 request 數量又太多時,就會很容易把有限的執行序都佔滿,造成伺服器滿載的狀況,但是其實伺服器的執行序都在休息!

Asynchronous servlets 基本上就是為了處理這樣的狀況而設計的,它讓處理 request 的執行序要等待外部工作的時候,可以去處理其他的 request,等到外部工作完成之後,伺服器會找另外一個執行序來把處理完成的外部工作結果送回 client,在這種架構下,client 不會感覺有任何不同,因為這些執行序與工作的分配都是在伺服器上面發生的,也只跟伺服器本身有關,所有的 request 資料還是跟以前一樣。

要達到這樣的工作處理流程,最關鍵的部分就是在外部工作完成時,要如何通知伺服器來領回處理完成的結果,這裡是它所使用的方式是透過 callback 函數的方式(這個 callback 函數是由伺服器本身提供)。

Pseudo Code

以下是伺服器處理 request 的 pseudo code:
  1. client 的 request 透過 HTTP 協定送到伺服器上,然後 request 被伺服器配送(dispatch)給某一個 servlet 處理。
  2. 伺服器上的某個執行序會執行 servlet 的 service() 函數來處理 request。
  3. service() 函數必須建立一個 AsyncContext 物件(使用 startAsync() 函數)。
  4. service() 函數必須建立一個 Runnable 物件,並傳遞給另一個執行序的 Executor 以執行真正要做的工作。Runnable 物件中會包含一個 AsyncContext 的 reference(因為在工作執行完成時,要通知伺服器領回)。
  5. service() 函數執行結束。(這看起來不太直覺,但是在 asynchronous 的架構就是這樣)
在這個時候 service() 函數雖然執行結束了,但是因為真正的工作還在另外一個執行序中(Executor 物件)處理,所以 client 事實上還在等待,等到真正的工作完成時,伺服器就會接著下面的動作:
  1. Executor 的工作完成時,會通知 AsyncContext:它會將結果寫進 HttpResponse 中,然後呼叫 AsyncContextcomplete() 函數,這樣就會促使伺服器把處理完成的結果送回 client。

出問題時怎麼辦?

如果在 Runnable 中所執行的工作出錯時,這時候 Runnable 還沒把結果傳回給 Executor,client 在這種情況下只會收到某種一般的網路錯誤訊息,如果想要自己處理可能發生的錯誤,提供詳細的錯誤訊息,可以用下面的方式:
  1. 設置好 timeout 的時間上限,如果 Runnable 在設定的 timeout 時間內沒有正常完成工作的話,就會發出 timeout 的錯誤。
  2. 自己加入專門處理 timeout 錯誤的 listener 來處理這樣的錯誤。
這樣的話,如果 Runnable 在處理工作時出現錯誤,超過我們指定的 timeout 時間沒有完成時,則我們自己撰寫的 listener 就會被呼叫,通知你發生 timeout 錯誤了,這時候你就可以在 listener 中產生要給使用者看的錯誤訊息。如果這時候 Runnable 還繼續嘗試寫入 HttpResponse 物件時或是呼叫 AsyncContext.complete() 函數,都會產生例外(exception),基本上在這裡應該要把之前所產生的輸出全部丟棄,只輸出錯誤訊息。

Asynchronous Servlet 範例程式

以下是兩個 asynchronous servlets 範例程式,一個是比較簡單的範例,示範最基本的 asynchronous servlet 的撰寫方式。而另一個則是比較複雜的範例,發生 timeout 錯誤時,該怎麼處理。

基本上只要符合 Java Servlet 3.0 規格的伺服器都應該可以執行這裡的範例程式,而這裡的伺服器環境是使用 Tomcat 7。

Example 1

這個範例是示範基本的 asynchronous servlets 與 web.xml 內容(這也是 Servlet 3.0 的內容之一)。以下是伺服器端的 servlet 程式碼:
@javax.servlet.annotation.WebServlet(
  // servlet name
  name = "simple",
  // servlet url pattern
  value = {"/simple"},
  // async support needed
  asyncSupported = true
)
public class SimpleAsyncServlet extends HttpServlet {

  /**
   * Simply spawn a new thread (from the app server's pool) for every new async request.
   * Will consume a lot more threads for many concurrent requests.
   */
  public void service(ServletRequest req, final ServletResponse res)
      throws ServletException, IOException {

    // create the async context, otherwise getAsyncContext() will be null
    final AsyncContext ctx = req.startAsync();

    // set the timeout
    ctx.setTimeout(30000);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {
      public void onComplete(AsyncEvent event) throws IOException {
        log("onComplete called");
      }
      public void onTimeout(AsyncEvent event) throws IOException {
        log("onTimeout called");
      }
      public void onError(AsyncEvent event) throws IOException {
        log("onError called");
      }
      public void onStartAsync(AsyncEvent event) throws IOException {
        log("onStartAsync called");
      }
    });

    // spawn some task in a background thread
    ctx.start(new Runnable() {
      public void run() {

        try {
          ctx.getResponse().getWriter().write(
              MessageFormat.format("<h1>Processing task in bgt_id:[{0}]</h1>",
                                   Thread.currentThread().getId()));
        }
        catch (IOException e) {
          log("Problem processing task", e);
        }

        ctx.complete();
      }
    });

  }

}
這段程式碼有一些需要注意的地方:
  1. 你可以使用 annotation 指定 servlet 名稱與其 URL 位址,而不用額外撰寫 web.xml 的內容。
  2. 這裡必須在 annotation 中加入 asyncSupported=true 告訴伺服器這個 servlet 需要 asynchronous 模式。
  3. service() 函數中,timeout 的時間設定為 30 秒,所以如果 Runnable 在 30 秒之內做完所有的工作並且呼叫 complete(),就不會有 timeout 錯誤產生。
  4. Runnable 物件將真正要做的主要工作(也是最耗時的部分)包裝起來,交給另外一個執行序執行。
  5. 在這理 Runnable 所負責的工作是取得目前執行序的 ID,然後交給伺服器傳回到 client 端。
  6. 這裡的 AsyncContext listener 並沒有做什麼事情,只是單純將訊息寫入伺服器的記錄中而已。

以下是 client 端的 Java 測試程式碼:
public class LoadTester {
  public static final AtomicInteger counter = new AtomicInteger(0);
  public static final int maxThreadCount = 100;

  public static void main(String[] args) throws InterruptedException {
    new LoadTester();
  }

  public LoadTester() throws InterruptedException {
    // call simple servlet
    ExecutorService exec1 = Executors.newCachedThreadPool();
    for (int i = 0; i < maxThreadCount; i++) {
      exec1.submit(new UrlReaderTask("http://localhost:8080/test/simple"));
    }
    exec1.shutdown();

    Thread.currentThread().sleep(5000);
    System.out.println("....NEXT....");

    // call complex servlet
    counter.set(0);
    ExecutorService exec2 = Executors.newCachedThreadPool();
    for (int i = 0; i < maxThreadCount; i++) {
      exec2.submit(new UrlReaderTask("http://localhost:8080/test/complex"));
    }
    exec2.awaitTermination(1, TimeUnit.DAYS);
  }

  public class UrlReaderTask implements Runnable {
    private String endpoint;
    public UrlReaderTask(String s) {
      endpoint = s;
    }
    public void run() {
      try {
        actuallyrun();
      }
      catch (Exception e) {
        System.err.println(e.toString());
      }
    }
    public void actuallyrun() throws Exception {
      int count = counter.addAndGet(1);
      BufferedReader in = new BufferedReader(
          new InputStreamReader(new URL(endpoint).openStream()));
      String inputLine;
      while ((inputLine = in.readLine()) != null) {
        System.out.println(MessageFormat.format("thread[{0}] : {1} : {2}",
                                                count, inputLine, endpoint));
      }
      in.close();
    }
  }
} //end class ComplexLoadTester
這個簡單的 Java 程式會產生 100 個執行序,同時建立 HTTP 的連線連到我們寫的 asynchronous servlets,然後輸出從伺服器上取得的執行序 ID 資訊。

在這個範例中,我們取得的執行序 ID 會比較雜亂,因為這些執行序是來自於 Tomcat 7 伺服器的 thread pool。

Example 2

這個比較複雜的範例是將上面的範例再做一些改變:

  1. 這個範例中,servlet 使用 fixed thread pool 管理自己的 thread pool,而 thread pool 的大小在這裡是使用 annotation 透過 servlet 的 init parameter 將預設值設定為 3。
  2. 在前 4 個 requests 中會模擬出錯的狀況,在 service() 中產生例外(exception)。
  3. 在模擬耗時的工作部分,我們使用 sleep() 函數讓程式休息 0 到 5 秒(隨機產生),而 AsyncContext 的 timeout 的時間上限則設為 60 秒,所以大約最後的 20 筆 requests 會出現 timeout 的錯誤,因為我們總共只有 3 個執行序可以同時處理 100 筆 requests,排在後面的 requests 就會來不及在 60 秒之內處理完。
  4. 如果在 timeout 錯誤發生時,AsyncContext 的 listener 必須要自己呼叫 AsyncContext.complete() 函數。
  5. 一旦 timeout 錯誤發生,伺服器就會將 AsyncContext 中的 HttpRequestHttpResponse 物件設為 invalid,透過這樣的方式通知正在執行的工作說 AsyncContext 已經是 invalid 的狀態了,這也是為什麼在 Runnable 中執行完成後,在寫入 response 之前,要先檢查它是不是 null,這個動作請記得一定要做,因為在 timeout 錯誤發生時,Runnable 這邊正在執行的工作可能無法得知 timeout 的錯誤,只能靠檢查 request 與 response 是否為 null 的方式來判斷,如果檢查出來是 null,則所有正在執行的工作都可以終止了,因為這個時候 AsyncContext 的 listener 或伺服器已經將 timeout 的錯誤訊息傳給 client 端了,縱使你把所有的工作算完也沒有用了。
以下是這個範例的程式碼:
@javax.servlet.annotation.WebServlet(
    // servlet name
    name = "complex",
    // servlet url pattern
    value = {"/complex"},
    // async support needed
    asyncSupported = true,
    // servlet init params
    initParams = {
        @WebInitParam(name = "threadpoolsize", value = "3")
    }
)
public class ComplexAsyncServlet extends HttpServlet {
  public static final AtomicInteger counter = new AtomicInteger(0);
  public static final int CALLBACK_TIMEOUT = 60000;
  public static final int MAX_SIMULATED_TASK_LENGTH_MS = 5000;

  /** executor svc */
  private ExecutorService exec;

  /** create the executor */
  public void init() throws ServletException {
    int size = Integer.parseInt(
        getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  /** destroy the executor */
  public void destroy() {
    exec.shutdown();
  }

  /**
   * Spawn the task on the provided {@link #exec} object.
   * This limits the max number of threads in the
   * pool that can be spawned and puts a ceiling on
   * the max number of threads that can be used to
   * the init param "threadpoolsize".
   */
  public void service(final ServletRequest req, final ServletResponse res)
      throws ServletException, IOException {
    // create the async context, otherwise getAsyncContext() will be null
    final AsyncContext ctx = req.startAsync();
    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);
    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {
      /** complete() has already been called on the async context, nothing to do */
      public void onComplete(AsyncEvent event) throws IOException { }
      /** timeout has occured in async task... handle it */
      public void onTimeout(AsyncEvent event) throws IOException {
        log("onTimeout called");
        log(event.toString());
        ctx.getResponse().getWriter().write("TIMEOUT");
        ctx.complete();
      }
      /** THIS NEVER GETS CALLED - error has occured in async task... handle it */
      public void onError(AsyncEvent event) throws IOException {
        log("onError called");
        log(event.toString());
        ctx.getResponse().getWriter().write("ERROR");
        ctx.complete();
      }
      /** async context has started, nothing to do */
      public void onStartAsync(AsyncEvent event) throws IOException { }
    });
    // simulate error - this does not cause onError - causes network error on client side
    if (counter.addAndGet(1) < 5) {
      throw new IndexOutOfBoundsException("Simulated error");
    } else {
      // spawn some task to be run in executor
      enqueLongRunningTask(ctx);
    }
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition
   * that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this
   * context has already timedout (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx) {
    exec.execute(new Runnable() {
      public void run() {
        try {
          // simulate random delay
          int delay = new Random().nextInt(MAX_SIMULATED_TASK_LENGTH_MS);
          Thread.currentThread().sleep(delay);
          // response is null if the context has already timedout
          // (at this point the app server has called the listener already)
          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(
                MessageFormat.format(
                    "<h1>Processing task in bgt_id:[{0}], delay:{1}</h1>",
                        Thread.currentThread().getId(), delay)
            );
            ctx.complete();
          }
          else {
            throw new IllegalStateException(
                "Response object from context is null!");
          }
        }
        catch (Exception e) {
          log("Problem processing task", e);
          e.printStackTrace();
        }
      }
    });
  }
}
這個範例的 client 程式是與上一個範例共用的,也就是說我們是使用一個 client 程式同時測試兩個不同的 servlets。

當你使用 client 程式同時產生 100 筆 requests 來測試這個比較複雜的範例的時候,有幾個地方是需要注意的:
  1. 這個較複雜的範例最多只會同時使用 3 條執行序來處理 100 筆 requests。
  2. 在這個比較複雜的 servlet 中每一筆 request 最多會花費五秒鐘來處理,大約有 20% 的 requests 會發生 timeout 錯誤(因為 timeout 時間上限設為 60 秒,所有的 requests 都是一起進入伺服器中的,然後排在伺服器中的 queue 中等待處理)。
  3. 縱使 100 筆 requests 可以一起進入伺服器,但是在同一個時間只有 3 個執行序在處理工作。
  4. 在 timeout 錯誤發生之後,即使 AsyncContext 的 listener 已經呼叫 complete() 並將錯誤訊息送給 client 了,但是 Runnable 還是會繼續執行它的工作,直到最後產生錯誤之後將結果丟棄,這個情況是程式設計者在撰寫 asynchronous servlet 時需要謹記在心的,如果 timeout 錯誤發生後,還讓這樣的工作在背後跑,是會浪費 CPU 與記憶體的資源的(在 Java 中的執行序並沒有優先權的設定,所有的執行序都會一起執行),所以最好在撰寫這樣的程式時,要在執行工作時,順便檢查一下是否有 timeout 的情況,以避免這樣的狀況發生。
  5. 當伺服處理器前四筆 requests 時,會產生我們設定好的錯誤例外,這時候在 client 端也會產生對應的網路例外。

到底要不要使用 Asynchronous Servlet?

asynchronous servlet 所帶給伺服器的效能提昇是顯而易見的,而且基本上只要你對於 asynchronous 的處理流程夠了解,Servlet API 3.0 所提供的 asynchronous API 應該是很容易使用的,所以如果你的伺服器端會常常處理耗時的工作,就可以改用這樣的架構。

但如果你沒有搞清楚 asynchronous 的處理方式,則面對這些不太直覺的 callback 使用方式,可能會讓你更頭痛,縱使使用 asynchronous servlet 可以讓伺服器的效能提昇,但是程式開發者還是要很了解這個架構的運行方式,以免為了提昇效能反而造成程式出錯。


除了 asynchronous 之外,Tomcat 7 與 Servlet API 3.0 也使用一些 annotations 可以讓設定 servlet 更容易,並且還可以動態載入 servlet(loading servlets programmatically),有興趣的人可以自己查詢相關的資料。

參考資料:Developer LifeRamesh PVKJSR-000315 Java Servlet 3.0
本站已經搬家了,欲查看最新的文章,請至 G. T. Wang 新網站

沒有留言:

張貼留言