Thread pools & Executors

原文:

http:\/\/givemepass-blog.logdown.com\/posts\/296960-how-to-use-the-threadpool

Java的Executor框架為Thread以及使用的資源有更進一步的控制, java.util.concurrent 提供一方便的介面 Executor 來處理 multithread 任務,目標是將任務的建立跟執行分開。

Executor 為主要介面,ExecutorService 延伸其功能。

  • 如果不須回傳值,則可使用 Runnable 介面。
  • 若有回傳值則可使用 Callable 介面。
  • Future 則為執行完的回傳值,若是 Runnable的話,Future 回傳為 null。

public interface Executor{
    void execute(Runnable runnable);
}

public class SimpleExecutor extends Executor{
    public void execute(Runnable runnable){
        new Thread(runnable).start();
    }
}

不過一般我們不會自己設計Executor,Java提供了ThreadPoolExecutor讓我們使用, 有幾個優點:

  • Thread能保持存活, 等待新任務, 不會隨著任務建立再銷毀。
  • Thread Pool限制最大Thread數量, 避免系統浪費。
  • Thread的生命週期被Thread Pool控制。

ThreadPoolExecutor可以自行定義一些設定。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue
);
  • core pool size(核心緩衝池數量):

    Thread pool的Thread下限, Thread數量不會低於這個數字,

  • maxumum pool size(最大緩衝池數量):

    Thread pool的Thread最大數量, 如果Thread都被執行, 則Task會被塞進Queue直到有空閒的Thread出現為止。

    比較好的做法是根據底層硬體來決定數量。

int N = Runtime.getRuntime().availableProcessors();
  • keep-alive time(最大閒置時間):

    如果超過閒置時間, 則系統會回收core Thread數量以上的Thread。

  • task queue type(任務佇列類型):

    被加進來的Task, 根據策略不同, 所用的演算法也會不同。

利用allowCoreThreadTimeOut(true)這個方法, 可以在Core Thread閒置的時候, 讓系統回收。

int N = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    N,
    N * 2,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

這邊有一個重點可能需要注意一下。

如果一開始

BlockingQueue<Runnable> rList = new LinkedBlockingQueue<Runnable>();
rList.add(new Runnable(){
    @Override
    public void run() {
        //run 1

    }    
});
rList.add(new Runnable(){
    @Override
    public void run() {
        //run 2

    }    
});
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    1,
    2,
    1,
    TimeUnit.SECONDS,
    rList);

一開始Thread Pool剛建立Thread,此時並沒有任何Thread可以執行Task,因此所有的Task將會被丟進Queue內等待,直到有新的Task後來又被加入,才會連同之前等待的Task一起執行。

如果要解決這個問題可以預先建立core thread。

executor.prestartCoreThread();
//or

executor.prestartAllCoreThreads();

呼叫其中一個方法可以先建立core thread。

另外還有一個陷阱就是如果core thread設定為0,那麼講無論如何就必須等有其他的task再被加到Thread Pool內才會一起把Queue內的task執行。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    0,
    2,
    1,
    TimeUnit.SECONDS,
    rList);

如果你不想要自建Excutor,Java有內建一些Executor。

ExecutorService fixExecutor = Executors.newFixedThreadPool(2);
ExecutorService cacheExecutor = Executors.newCachedThreadPool();
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();

分別是

  • 固定尺寸執行器(newFixedThreadPool)

    只要固定好數量, 假設是2, 那就代表Thread永遠固定是2個, 特性是用完就丟, 執行新任務會再開新的Thread。

  • 動態尺寸執行器(newCachedThreadPool)

    Thread會隨著任務多寡, 新增或刪除, 假設一個Thread被閒置60秒, 系統則會進行移除。

  • 單一執行緒執行器(newSingleThreadExecutor)

    這個執行器最多只會有一個Thread, 因此是執行緒安全的, 但是相對效率會下降, 因為Task會被阻塞。

  • 還有

    Executor pool = Executors.newScheduledThreadPool(10) ; //建立一個排程的thread pool,大小為10
    Executor pool = Executors.newSingleScheduleThreadExecutor() ; //建立一個排程的單一執行緒
    

透過Callable搭配Future可以讓我們更方便管理執行緒。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Object> future = executor.submit(new Callable<Object>(){
        public Object call() throws Exception{
            Object obj = doLongTask();
        return obj;
    }
});

//wait future

while(future.isDone());
Object result = future.get();

與Runnable不同的是Callable可以回傳結果,透過blocking直到long task完成, 回傳物件。

Executor提供同時多個Thread並行的操作

  • InvokeAll:同時並行多個Thread, 並且透過blocking來取回每一個Task的結果。

  • InvokeAny:同時並行多個Thread, 只要有一個回傳成功, 則終止剩下的Task。

也可以利用ExecutorCompletionService來查詢所完成的任務,他會將完成任務的結果放置BlockingQueue內,透過polling的方式, 來查詢任務是否完成。

沿伸閱讀: ExecutorCompletionService

results matching ""

    No results matching ""