线程间的通信方式

2021.01.14 11:01 11
阅读约 1 分钟

通用代码:


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * Created by Edison Xu on 2017/3/2.
 */
public enum Helper {
 
    instance;
 
    private static final ExecutorService tPool = Executors.newFixedThreadPool(2);
 
    public static String[] buildNoArr(int max) {
        String[] noArr = new String[max];
        for(int i=0;i<max;i++){
            noArr[i] = Integer.toString(i+1);
        }
        return noArr;
    }
 
    public static String[] buildCharArr(int max) {
        String[] charArr = new String[max];
        int tmp = 65;
        for(int i=0;i<max;i++){
            charArr[i] = String.valueOf((char)(tmp+i));
        }
        return charArr;
    }
 
    public static void print(String... input){
        if(input==null)
            return;
        for(String each:input){
            System.out.print(each);
        }
    }
 
    public void run(Runnable r){
        tPool.submit(r);
    }
 
    public void shutdown(){
        tPool.shutdown();
    }
 
}

1. 第一种解法

尝试使用多种解法实现线程间通信,包含多种小的不同实现方式,但一个共同点就是靠一个共享变量来做控制;

① 利用最基本的synchronizednotifywait


public class MethodOne {
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                try {
                    for (int i = 0; i < arr.length; i=i+2) {
                        synchronized (threadToGo) {
                            while (threadToGo.value == 2)
                                threadToGo.wait();
                            Helper.print(arr[i], arr[i + 1]);
                            threadToGo.value = 2;
                            threadToGo.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Oops...");
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                try {
                    for (int i = 0; i < arr.length; i++) {
                        synchronized (threadToGo) {
                            while (threadToGo.value == 1)
                                threadToGo.wait();
                            Helper.print(arr[i]);
                            threadToGo.value = 1;
                            threadToGo.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println("Oops...");
                }
            }
        };
    }
    class ThreadToGo {
        int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
        MethodOne one = new MethodOne();
        Helper.instance.run(one.newThreadOne());
        Helper.instance.run(one.newThreadTwo());
        Helper.instance.shutdown();
    }
}

② 利用LockCondition

public class MethodTwo {
    private Lock lock = new ReentrantLock(true);
    private Condition condition = lock.newCondition();
    private final ThreadToGo threadToGo = new ThreadToGo();
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    try {
                        lock.lock();
                        while(threadToGo.value == 2)
                            condition.await();
                        Helper.print(arr[i], arr[i + 1]);
                        threadToGo.value = 2;
                        condition.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    try {
                        lock.lock();
                        while(threadToGo.value == 1)
                            condition.await();
                        Helper.print(arr[i]);
                        threadToGo.value = 1;
                        condition.signal();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
    }
    class ThreadToGo {
        int value = 1;
    }
    public static void main(String args[]) throws InterruptedException {
        MethodTwo two = new MethodTwo();
        Helper.instance.run(two.newThreadOne());
        Helper.instance.run(two.newThreadTwo());
        Helper.instance.shutdown();
    }
}

③ 利用volatile:

volatile 修饰的变量值直接存在main memory里面,子线程对该变量的读写直接写入main memory,而不是像其它变量一样在local thread里面产生一份copy。 volatile 能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。


public class MethodThree {
    private volatile ThreadToGo threadToGo = new ThreadToGo();
    class ThreadToGo {
        int value = 1;
    }
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    while(threadToGo.value==2){}
                    Helper.print(arr[i], arr[i + 1]);
                    threadToGo.value=2;
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    while(threadToGo.value==1){}
                    Helper.print(arr[i]);
                    threadToGo.value=1;
                }
            }
        };
    }
    public static void main(String args[]) throws InterruptedException {
        MethodThree three = new MethodThree();
        Helper.instance.run(three.newThreadOne());
        Helper.instance.run(three.newThreadTwo());
        Helper.instance.shutdown();
    }
   }

④ 利用AtomicInteger


public class MethodFive {
    private AtomicInteger threadToGo = new AtomicInteger(1);
    public Runnable newThreadOne() {
        final String[] inputArr = Helper.buildNoArr(52);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i=i+2) {
                    while(threadToGo.get()==2){}
                    Helper.print(arr[i], arr[i + 1]);
                    threadToGo.set(2);
                }
            }
        };
    }
    public Runnable newThreadTwo() {
        final String[] inputArr = Helper.buildCharArr(26);
        return new Runnable() {
            private String[] arr = inputArr;
            public void run() {
                for (int i = 0; i < arr.length; i++) {
                    while(threadToGo.get()==1){}
                    Helper.print(arr[i]);
                    threadToGo.set(1);
                }
            }
        };
    }
    public static void main(String args[]) throws InterruptedException {
        MethodFive five = new MethodFive();
        Helper.instance.run(five.newThreadOne());
        Helper.instance.run(five.newThreadTwo());
        Helper.instance.shutdown();
    }
}

2. 第二种解法

是利用CyclicBarrierAPI;

CyclicBarrier可以实现让一组线程在全部到达Barrier时(执行await()),再一起同时执行,并且所有线程释放后,还能复用它,即为Cyclic。
CyclicBarrier类提供两个构造器

public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {

public class MethodFour{
      private final CyclicBarrier barrier;
      private final List<String> list;
      public MethodFour() {
          list = Collections.synchronizedList(new ArrayList<String>());
          barrier = new CyclicBarrier(2,newBarrierAction());
      }
      public Runnable newThreadOne() {
          final String[] inputArr = Helper.buildNoArr(52);
          return new Runnable() {
              private String[] arr = inputArr;
              public void run() {
                  for (int i = 0, j=0; i < arr.length; i=i+2,j++) {
                      try {
                          list.add(arr[i]);
                          list.add(arr[i+1]);
                          barrier.await();
                      } catch (InterruptedException | BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                  }
              }
          };
      }
      public Runnable newThreadTwo() {
          final String[] inputArr = Helper.buildCharArr(26);
          return new Runnable() {
              private String[] arr = inputArr;
              public void run() {
                  for (int i = 0; i < arr.length; i++) {
                      try {
                          list.add(arr[i]);
                          barrier.await();
                      } catch (InterruptedException | BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                  }
              }
          };
      }
      private Runnable newBarrierAction(){
          return new Runnable() {
              @Override
              public void run() {
                  Collections.sort(list);
                  list.forEach(c->System.out.print(c));
                  list.clear();
              }
          };
      }
      public static void main(String args[]){
          MethodFour four = new MethodFour();
          Helper.instance.run(four.newThreadOne());
          Helper.instance.run(four.newThreadTwo());
          Helper.instance.shutdown();
      }

 

这里多说一点,这个API其实还是利用lockcondition,无非是多个线程去争抢CyclicBarrier的instance的lock罢了,最终barrierAction执行时,是在抢到CyclicBarrierinstance的那个线程上执行的。

 

字数:132 发布于 1 个月前
Copyright 2018-2021 Siques