大数据

Java基础知识 — 多线程

  1. 多线程的概念

    线程是指一个任务从头到尾的执行流,线程提供了一个运行的机制。

    Java中,一个程序中可以并发的启动多个线程,这也就意味着线程可以在多处理器系统上同一时刻运行。

    多线程可以使程序反应更快,执行效率更高。

    1. 多线程编程

以上介绍的概念可能还不够清晰的解释什么是多线程,没关系,我们举一个例子看一下。当然,如果我们想要创建一个多线程程序,那么首先我们应该提供多个任务供我们去执行,想要创建一个这样的任务,我们需要实现 Runnable接口,打开Runnable 源码如下

public interface Runnable {
    public abstract void run();
}

很简单,其中只是包含了一个 run()的抽象方法,只要我们在子类中去实现它就可以了。

创建一个打印 “ I am task a!” 的任务

public class A implements Runnable{

    @Override
    public void run() {
        System.out.println("I am task a !");
    }

}

基于同样的原理,实现一个打印 “ I am task b !” 和一个 “I am task c !”的方法,代码同上。然后在主函数中去开启线程,实现打印任务。

主函数实现如下:

public class Main {
    public static void main(String[] args){
        int i = 0;
        while(50 >= i){

            Thread threadA = new Thread(new A());
            Thread threadB = new Thread(new B());
            Thread threadC = new Thread(new C());

            threadA.start();
            threadB.start();
            threadC.start();

            i++;
        }
    }

}

在主函数中我们开启了这三个线程,去实现打印任务,按照我们平常的理解,该程序肯定是依次执行A,B,C任务,但是打印结果却是:

I am task b !
I am task c !
I am task a !
I am task c !
I am task a !
I am task b !
I am task b !
I am task a !
I am task c !
I am task b !

怎么样,看到这里,你应该知道什么是多线程了,但是你肯定又有一个疑问,难道我们每次开线程都要去new Thread()吗?

当然不是,这种方法对于执行单个任务而言确实很方便,但是当我们大量的任务时,这种方法显然是十分繁琐的,而且会降低性能,所以我们要引出另一个概念,也就是线程池。

3 . 线程池

Java提供了Executor接口来执行线程池中的任务,提供ExecutorServive接口来管理和控制任务。下面我们来看看ExecutorExecutorService这两个接口。

public interface Executor {
     void execute(Runnable command);
}

可见,Executor接口只是提供了一个 execute()方法,通过其函数,我们应该可以得出这个方法是我们执行任务时使用的一个函数,类似于 Thread类中的start()方法。

public interface ExecutorService extends Executor {
    void shutdown(); //关闭执行器,但是允许完成当前任务。一旦关闭,无法再接收新的任务
    List shutdownNow(); //强制关闭执行器,就算当前线程池中存在尚未完成的任务,仍然立即关闭

    boolean isShutdown();//如果执行器已经关闭,则返回 true
    boolean isTerminated(); //如果线程池中的任务都被终止,则返回true
}

当然,ExecutorService中还包含其他方法,以上列出的只是几个常用的方法,可以看到,ExecutorService 是继承Executor接口的,但是这些都只是接口,并没有提供具体的实现方法,我们该如何利用线程池开启我们的任务呢?当然,还存在一个 Executors类,这个类中实现了创建线程池的方法,主要有两种方式

//创建一个线程池,该线程池可并发执行的线程数固定不变。
+ newFixedThreadPool(numberOfThreads : int) : ExecutorService

//创建一个线程池,可以按需要创建新线程,也即线程数可变

+ newCachedThreadPool():ExecutorService

下面我就用线程池的方法,实现A,B,C任务。

public class Main {
    public static void main(String[] args){
        int i = 0;
        ExecutorService executor = Executors.newFixedThreadPool(3);
        while(10 > i){

            executor.execute(new A());
            executor.execute(new B());
            executor.execute(new C());
            executor.shutdown();
            i++;
        }
    }

}

学过计算机组成原理的人可能知道指令流水线设计,影响流水线性能的有三大原因,其中有一点就是数据相关,也就是由于重叠操作,改变了对操作数读写的访问顺序,从而导致了数据相关冲突。在多线程里面,会不会发生这种情况呢? 答案是肯定的,下面我们来举一个例子:

设计一个IPhoneShop 类

public class IPhoneShop {

    private static int number = 3;
    public static void sellIPhone(){
        if(0 < number){

            try {
                Thread.sleep(5);    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            number --;
            System.out.println("Sell a IPhone !");
        } else {
            System.out.println("Sell out !");
        }
    }
}

类的设计十分简单,初始化店内只有3台IPhone,如果卖出去一部,就打印卖出一部IPhone,如IPhone数量为0,则打印已经卖完了,在sellIPhone()方法中,我设置了线程睡眠 5 毫秒,是为了增大差异,更明显的看出线程的竞争状态。

这时的主函数为

public class Main {
    public static IPhoneShop mShop = new IPhoneShop();
    public static void main(String[] args){
        int i = 1;
        ExecutorService executor = Executors.newFixedThreadPool(100);
        while(100 >= i){
            executor.execute(new A());
            i++;
        }

        executor.shutdown();
    }

}

A类

public class A implements Runnable{

    @Override
    public void run() {
        Main.mShop.sellIPhone();
    }

}

执行之后你会发现有意思的事情

I got a IPhone !
I got a IPhone !
Sell out !
I got a IPhone !
I got a IPhone !
I got a IPhone !
I got a IPhone !
I got a IPhone !
Sell out !
Sell out !
Sell out !
I got a IPhone !
I got a IPhone !
I got a IPhone !
I got a IPhone !
I got a IPhone !

这里只是粘贴了部分结果,显然,已经超出了IPhone的数量,这时为什么呢?当一个线程通过 sellIPhone()的 if 判断时,我们设置了线程睡眠,这个时候,IPhone的数量并没有减少,而另一个线程又进来了,这个时候 if 判断也就失去了其意义,因为在number数量还未来得及减少的时候,很多子线程已经通过了if判断。

为了解决这个问题,Java 引入了synchronized 关键字,这个关键字用于同步线程,以便一次只有一个线程能够访问这个方法。也就是说,当一个线程进入这个方法后,这个方法便被加了锁,别的线程只能够选择等待,直到前一个进入的线程执行完为止,这样就解决了线程间的竞争问题。

public static synchronized void sellIPhone()

只需要这样更改,便不会出现以上问题了,这也是synchronized 关键字的简单用法。但是synchronized 是隐式加锁,具体实现我们并看不到,那么有没有一种方法,能够显式给方法加锁呢?

4.使用 Lock 加锁同步

首先来介绍一下Lock 接口,看看定义

public interface Lock {

     void lock();

     void lockInterruptibly() throws InterruptedException;
     boolean tryLock();
     boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
     void unlock();
     Condition newCondition()
}

我们主要能够用到lock(), unlock(), newCondition()三个方法,顾名思义,lock()用于加锁,unlock()用于解锁, newCondition()返回绑定到Lock实例的新的Condition 实例。下面就让我们使用Lock方法来进行方法的显式加锁。

public class IPhoneShop {

    private static int number = 3;
    private static Lock mLock = new ReentrantLock();

    public static void sellIPhone(){
        mLock.lock();
        if(0 < number){

            try {
                Thread.sleep(5);

            } catch (InterruptedException e) {

                e.printStackTrace();
            }
            number --;
            System.out.println("I got a IPhone !");

        } else {
            System.out.println("Sell out !");
        }

        mLock.unlock();

    }
}

这样也就实现了和synchronized一样的效果,这样也显得更加灵活和直观。

想象一下刚才卖IPhone 的场景,如果在卖的同时,又进了新货,可不可以让那么没有没到IPhone的用户先等待一下,等到进新货时,由店家通知所有没有买到IPhone的用户,我们又进了新货,你们可以来买了。

这就涉及到了线程间的协作问题,还记得Lock 接口中的那个 newCondition()方法吗?Condition接口中存在三个重要的方法,也就是await() ,signal() , signalAll() 方法。await()方法可以让当前线程都处于等待状态,signal() 方法可以唤醒一个等待的线程,signalAll()方法可以唤醒所有的等待线程.

public class IPhoneShop {

    private static int number = 3;
    private static Lock mLock = new ReentrantLock();
    private static Condition newCondition = mLock.newCondition();

    public static void sellIPhone(){
        mLock.lock();
        if(0 < number){
            number --;
            System.out.println("I got a IPhone !");

        } else {
            System.out.println("Sell out !");
            try {
                newCondition.await();
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
        }

        mLock.unlock();

    }

    public static void getIPhone(){
        mLock.lock();
        number += 3;
        newCondition.signalAll();
        mLock.unlock();
    }
}

这样,当IPhone售完后,所有线程将进入等待阶段,直到再次进货,newCondition 唤醒所有的等待线程,再次进入销售环节。这样就实现了使用Condition线程间的协作。

5.信号量

信号量可以用于限制访问共享资源的线程数,在访问资源前,必须获得信号的许可,所以,使用这种方法,也可以达到给方法加锁的目的,其效果和synchronized关键字和Lock是类似的。

使用信号量的方法为

Semaphore mSemaphore = new Semaphore(numberOfPermits:int);
或者
Semaphore mSemaphore = new Semaphore(Permits:int,fairs:boolean);//可以创建一种具有公平策略的信号量,所谓公平策略也就是说等待时间最长的线程获得方法的使用权

Semaphore类中有两个重要方法,acquire()release() 两个方法,第一个方法表示这个线程已经获得了方法的使用权,如果获得使用权的线程数等于指定的数,那么其他线程将不能再获得方法的使用权。release()用于线程释放使用权。

具体使用如下

Semaphore mSemaphore = new Semaphore(2);

public void method(){
try {
    mSemaphore.acquire();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally{
    mSemaphore.release();
    }
}