`

Java 线程池

 
阅读更多
参考文章:
http://cuisuqiang.iteye.com/blog/2019372
http://www.cnblogs.com/dolphin0520/p/3949310.html
http://hbiao68.iteye.com/blog/1929245
在项目中,系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互。在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存周期很短的线程时,更应该考虑使用线程池。
使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数。

从JAVA5开始新增了一个Executors工具类来产生线程池,它有如下几个静态工厂方法来创建线程池。

强烈建议程序员使用较为方便的 Executors 工厂方法
Executors.newCachedThreadPool()  创建一个可根据需要创建新线程的线程池(线程池为无限大),但是在以前构造的线程可用时将重用它们。
Executors.newFixedThreadPool(int) 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程
Executors.newSingleThreadExecutor() 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
Executors.newScheduledThreadPool(int corePoolSize) 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行

ExecutorService代表尽快执行线程的线程池(只要线程池中有空闲的线程,就立即执行线程任务),
程序只要将一个Runnable对象或Callable对象(代表线程任务)提交给该线程,该线程就会尽快执行该任务

ExecutorService方法:
shutdown() 启动一次顺序关闭线程池,执行以前提交的任务,但不接受新任务。此方法不会等待任务的完成,使用awaitTermination去实现此功能。
shutdownNow() 通过调用Thread.interrupt方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
但对intterupts无响应的tasks可能永远不会停止。这个方法不会等待执行中的任务完成,使用awaitTermination去实现。
awaitTermination() 阻塞直到所有的任务执行完成(包括任务被interrupted),或者等待时间已到。
如果等待时间结束前所有任务执行结束,则返回true否者返回false 
--示例1
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class cachedThreadPoolTest {
  public static void main(String[] args) {
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
      final int index = i;
      cachedThreadPool.execute(new Runnable() {
        public void run() {
          try {
            Thread.sleep(2000);
            System.out.println(index);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
    }
    System.out.println(cachedThreadPool.isShutdown());
    // 等待线程运行完毕之后再停止线程。
    cachedThreadPool.shutdown();
    // 强制停止线程,如果当前线程正在执行,则被强制停止。
    //cachedThreadPool.shutdownNow();
    System.out.println(cachedThreadPool.isShutdown());

  }
}
 
--示例2
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class fixedThreadPoolTest {

  /**
   * @param args
   */
  public static void main(String[] args) {
    // TODO Auto-generated method stub
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 10; i++) {
      final int index = i;
      fixedThreadPool.execute(new Runnable() {
        public void run() {
          try {
            System.out.println(index);
            Thread.sleep(1000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
    }   

  }
}

--示例3
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class scheduledThreadPoolTest {

  public static void main(String[] args) {
    ScheduledExecutorService scheduledThreadPool = Executors
        .newScheduledThreadPool(1);
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
      public void run() {
        System.out
            .println("delay 1 seconds, and excute every 3 seconds");
      }
    }, 1, 3, TimeUnit.SECONDS);
  }
}

--示例4
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class singleThreadExecutorTest {
  public static void main(String[] args) {
    ExecutorService singleThreadExecutor = Executors
        .newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
      final int index = i;
      singleThreadExecutor.execute(new Runnable() {
        public void run() {
          try {
            System.out.println(index);
            Thread.sleep(2000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
    }
  }
}
------------------------------------------------------------------------------
并发编程 Callable,Future

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。
这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。
如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。
而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
今天我们就来讨论一下Callable、Future类的使用方法

一.Callable与Runnable
先说一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:
 public interface Runnable { 
    public abstract void run(); 
} 
由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():
 public interface Callable<V> { 
    /** 
     * Computes a result, or throws an exception if unable to do so. 
     * 
     * @return computed result 
     * @throws Exception if unable to compute a result 
     */
    V call() throws Exception; 
} 
可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。
那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

<T> Future<T> submit(Callable<T> task); 
<T> Future<T> submit(Runnable task, T result); 
Future<?> submit(Runnable task); 
第一个submit方法里面的参数类型就是Callable。
一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。


二.Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future的实现逻辑,在调用一个Call方法时,先返回一个jia结果,保证下面的方法可以正常执行,提高效率。待调用获取真正结果的方法时,才会返回真的结果值。
Future类位于java.util.concurrent包下,它是一个接口:

 public interface Future<V> { 
    boolean cancel(boolean mayInterruptIfRunning); 
    boolean isCancelled(); 
    boolean isDone(); 
    V get() throws InterruptedException, ExecutionException; 
    V get(long timeout, TimeUnit unit) 
        throws InterruptedException, ExecutionException, TimeoutException; 
} 


cancel方法用来取消任务,参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务.
      如果任务已完成或已取消或无法取消等,则返回false.如果任务尚未启动则取消此任务的执行并返回true.
      此方法返回true的情况下,再调用isCancelled/isDone方法肯定返回true.
      
isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone方法表示任务是否已经完成,若任务完成,则返回true;
get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
  也就是说Future提供了三种功能:
  1)判断任务是否完成;
  2)能够中断任务;
  3)能够获取任务执行结果。


使用Callable+Future获取执行结果

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableTest { 
    public static void main(String[] args) { 
        ExecutorService executor = Executors.newCachedThreadPool(); 
        Task task = new Task(); 
        Future<Integer> result = executor.submit(task); 
        executor.shutdown(); 
          
        try {
            Thread.sleep(1000); 
        } catch (InterruptedException e1) { 
            e1.printStackTrace(); 
        } 
          
        System.out.println("主线程在执行任务"); 
          
        try { 
            System.out.println("task运行结果"+result.get()); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } catch (ExecutionException e) { 
            e.printStackTrace(); 
        } 
          
        System.out.println("所有任务执行完毕"); 
    }  
} 
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception { 
        System.out.println("子线程在进行计算"); 
        Thread.sleep(3000); 
        int sum = 0; 
        for(int i=0;i<100;i++) 
            sum += i; 
        return sum; 
    } 
}

打印结果:
子线程在进行计算
主线程在执行任务
task运行结果4950
所有任务执行完毕

FutureTask的使用

我们先来看一下FutureTask的实现:

public class FutureTask<V> implements RunnableFuture<V>
   FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

FutureTask提供了2个构造器:

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
事实上,FutureTask是Future接口的一个唯一实现类。

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
 
三、Thread.UncaughtExceptionHandler是Java SE5中的新接口,它允许为每个Thread对象附着一个异常处理器.
此方法会在线程因未捕获的异常而临近死亡时被调用。

import java.util.concurrent.*;

class ExceptionThread2 implements Runnable {
  public void run() {
    Thread t = Thread.currentThread();
    System.out.println("run() by " + t);
    System.out.println("eh = " + t.getUncaughtExceptionHandler());
    throw new RuntimeException();
  }
}

class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
  public void uncaughtException(Thread t, Throwable e) {
    System.out.println("caught " + e);
  }
}

class HandlerThreadFactory implements ThreadFactory {
  public Thread newThread(Runnable r) {
    System.out.println(this + " creating new Thread");
    Thread t = new Thread(r);
    System.out.println("created " + t);
    t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
    System.out.println("eh = " + t.getUncaughtExceptionHandler());
    return t;
  }
}

public class CaptureUncaughtException {
  public static void main(String[] args) {
    ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
    exec.execute(new ExceptionThread2());
    exec.shutdown();
  }
}
四、ForkJoin线程池

ForkJoin 是jdk 7提供的并行计算框架,它提供了一种机制,可以把一个大的任务分解成若干个小的任务(fork方法),每个小任务通过单独的线程去处理,
等执行完成后,通过join方法合并起来。

package forJoin;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long>{//继承递归Task
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;
    
    public CountTask(long start,long end){
        this.start=start;
        this.end=end;
    }
    
    public Long compute(){
        long sum=0;
        boolean canCompute = (end-start)<THRESHOLD;
        if(canCompute){
            for(long i=start;i<=end;i++){
                sum +=i;
            }
        }else{
            long step=(start+end)/100;
            ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
            long pos=start;
            for(int i=0;i<100;i++){
                long lastOne=pos+step;//20000
                if(lastOne>end)lastOne=end;
                CountTask subTask=new CountTask(pos,lastOne);
                pos+=step+1;
                subTasks.add(subTask);
                subTask.fork();//小任务单线程提交
            }
            for(CountTask  t:subTasks){
                sum+=t.join();//获取每个小任务的结果值,并合并起来。
            }
        }
        return sum;
    }
    
    public static void main(String[]args){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,20000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try{
            long res = result.get();
            System.out.println("sum="+res);
        }catch(InterruptedException e){
            e.printStackTrace();
        }catch(ExecutionException e){
            e.printStackTrace();
        }
    }
}
 
五、

ReentrantLock(sychronized的加强)
特性:

可重入
可中断:在等待过程中,可以中断等待。(lock.lockInterruptibly())
可限时:可以实现一段时间内没有获取到对象锁时,做一些额外的操作。(lock1.tryLock(timeout, unit))
公平锁:当释放对象锁时,可以按照请求顺序将锁释放给等待的对象。(new ReentrantLock(true),即代表着先请求,先得到。造成吞吐量下降,性能上下降)

Lock的优点:

显式的Lock对象在加锁和释放锁方面,相对于内建的synchronized锁来说,赋予了更细粒度的控制力。
ReentrantLock允许你尝试获取锁但最终未获取锁,那么可以去执行其他一些事情,而不是等待直到
这个锁被释放,在尝试获取锁上可以通过TimeUnit类来指定等待时间。

Condition (wait和notify的加强,必须跟ReentrantLock关联)
await()方法会使当前线程等待,同时释放当前锁,
当其他线程中使用signal()时或者signalAll()方法时,
线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。
这和Object.wait()方法很相似。
awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。
singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似
此方法可以指定解锁的condition对象。


public class ReenterLock implements Runnable{
	public static ReentrantLock lock=new ReentrantLock();
	public static int i=0;
	public void run() {
		for(int j=0;j<10000000;j++){
			lock.lock();		//对i++进行加锁
			try{
				i++;
			}finally{
				lock.unlock();  //执行完后,进行解锁。
			}
		}
	}
	public static void main(String[] args) throws InterruptedException {
		ReenterLock tl=new ReenterLock();
		Thread t1=new Thread(tl);
		Thread t2=new Thread(tl);
		t1.start();t2.start();
		t1.join();t2.join();
		System.out.println(i);
	}
}
//可重入锁,当锁两次,也需要解锁两次。
public void run() {
		for(int j=0;j<10000000;j++){
			lock.lock();
			lock.lock();
			try{
				i++;
			}finally{
				lock.unlock();
				lock.unlock();
      }
    }
  }

public class ReenterLockCondition implements Runnable{
  public static ReentrantLock lock=new ReentrantLock();
  public static Condition condition = lock.newCondition();//condition要肯定要根据lock进行关联
  public void run() {
    try {
      lock.lock();
      condition.await();
      System.out.println("Thread is going on");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }finally{
      lock.unlock();
    }
  }
  public static void main(String[] args) throws InterruptedException {
    ReenterLockCondition tl=new ReenterLockCondition();
    Thread t1=new Thread(tl);
    t1.start();
    Thread.sleep(2000);
    lock.lock();  //
    condition.signal();
    lock.unlock(); //对象 解锁后,上面的run方法才能继续执行,
  }
 
六、
Java.util.concurrent.locks.ReadWriteLock有一种高级的线程锁机制,它允许多个线程读某个资源,但每次只允许一个线程来写。 
这种想法是,多个线程可以对共享的资源进行读操作,而且不会发生并发问题。
并发问题发生在并发的读取和写入共享资源时或者是多个线程并发写入的情况。


Read Lock 
如果没有写入线程锁住ReadWriteLock,并且没有线程需要获得写入锁进行写入操作。那么多个线程可以获得锁来进行读操作。
Write Lock 
如果没有线程在写或者读操作,那么一次仅有一个线程可以获得锁以进行写操作

ReentrantReadWriteLock支持锁升级操作
锁降级:从写锁变成读锁;
锁升级:从读锁变成写锁。

简单的示例:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.readLock().lock();
    // multiple readers can enter this section
    // if not locked for writing, and not writers waiting
    // to lock for writing.

readWriteLock.readLock().unlock();

readWriteLock.writeLock().lock();
    // only one writer can enter this section,
    // and only if no threads are currently reading.

readWriteLock.writeLock().unlock();

示例:
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
	public static void main(String[] args) {
		final Queue3 q3 = new Queue3();
		for (int i = 0; i < 3; i++) {
			new Thread() {
				public void run() {
					while (true) {
						q3.get();
					}
				}
			}.start();
		}
		for (int i = 0; i < 3; i++) {
			new Thread() {
				public void run() {
					while (true) {
						q3.put(new Random().nextInt(10000));
					}
				}
			}.start();
		}
	}
}

class Queue3 {
	private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
	private ReadWriteLock rwl = new ReentrantReadWriteLock();
	public void get() {
		rwl.readLock().lock();// 上读锁,其他线程只能读不能写
		System.out.println(Thread.currentThread().getName() + " be ready to read data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName() + "have read data :" + data);
		rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面
	}

	public void put(Object data) {
		rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
		System.out.println(Thread.currentThread().getName() + " be ready to write data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		this.data = data;
		System.out.println(Thread.currentThread().getName() + " have write data: " + data);
		rwl.writeLock().unlock();// 释放写锁
	}
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics