线程同步工具(六)控制并发阶段性任务的改变

    xiaoxiao2023-12-18  170

    声明:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 译者:郑玉婷

    控制并发阶段性任务的改变

    Phaser 类提供每次phaser改变阶段都会执行的方法。它是 onAdvance() 方法。它接收2个参数:当前阶段数和注册的参与者数;它返回 Boolean 值,如果phaser继续它的执行,则为 false;否则为真,即phaser结束运行并进入 termination 状态。

    如果注册参与者为0,此方法的默认的实现值为真,要不然就是false。如果你扩展Phaser类并覆盖此方法,那么你可以修改它的行为。通常,当你要从一个phase到另一个,来执行一些行动时,你会对这么做感兴趣的。

    在这个指南,你将学习如何控制phaser的 phase的改变,通过实现自定义版本的 Phaser类并覆盖 onAdvance() 方法来执行一些每个phase 都会改变的行动。你将要实现一个模拟测验,有些学生要完成他们的练习。全部的学生都必须完成同一个练习才能继续下一个练习。

    准备

    指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。

    怎么做呢

    按照这些步骤来实现下面的例子::

    001 package tool; 002  003 import java.util.Date; 004 import java.util.concurrent.Phaser; 005 import java.util.concurrent.TimeUnit; 006  007//1.   创建一个类,名为 MyPhaser,并特别的扩展 Phaser 类。 008 public class MyPhaser extends Phaser { 009  010     // 2. 覆盖 onAdvance() 方法。根据 phase 的属性的值,我们将调用不同的辅助方法。如果 phase 等于 0,调用 011     // studentsArrived() 方法;又如果 phase 等于 1,调用 finishFirstExercise() 方法;又如果 phase 012     // 等于 2,调用 finishSecondExercise() 方法;再如果 phase 等于 3,调用 finishExam() 013     // 方法。否则,返回真值,表示phaser已经终结。 014     @Override 015     protected boolean onAdvance(int phase, int registeredParties) { 016         switch (phase) { 017         case 0: 018             return studentsArrived(); 019         case 1: 020             return finishFirstExercise(); 021         case 2: 022             return finishSecondExercise(); 023         case 3: 024             return finishExam(); 025         default: 026             return true; 027         } 028     } 029  030     // 3. 实现辅助方法 studentsArrived()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。 031     private boolean studentsArrived() { 032         System.out.printf("Phaser: The exam are going to start. The students are ready.\n"); 033         System.out.printf("Phaser: We have %d students.\n", 034                 getRegisteredParties()); 035         return false; 036     } 037  038     // 4. 实现辅助方法 finishFirstExercise()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。 039     private boolean finishFirstExercise() { 040         System.out.printf("Phaser: All the students have finished the first exercise.\n"); 041         System.out.printf("Phaser: It's time for the second one.\n"); 042         return false; 043     } 044  045     // 5. 实现辅助方法 finishSecondExercise()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。 046     private boolean finishSecondExercise() { 047         System.out.printf("Phaser: All the students have finished the second exercise.\n"); 048         System.out.printf("Phaser: It's time for the third one.\n"); 049         return false; 050     } 051  052     // 6. 实现辅助方法 finishExam()。它在操控台写2条信息,并返回false值来表明phaser将继续执行。 053     private boolean finishExam() { 054         System.out.printf("Phaser: All the students have finished the exam.\n"); 055         System.out.printf("Phaser: Thank you for your time.\n"); 056         return true; 057     } 058  059     // 7. 创建一个类,名为 Student,并一定实现 Runnable 接口。这个类将模拟测验的学生。 060     public class Student implements Runnable { 061  062         // 8. 声明 a Phaser 对象,名为 phaser. 063         private Phaser phaser; 064  065         // 9. 实现类的构造函数,初始 Phaser 对象。 066         public Student(Phaser phaser) { 067             this.phaser = phaser; 068         } 069  070         // 10. 实现 run() 方法,模拟真实测验。 071         @Override 072         public void run() { 073  074             // 11. 首先,方法写一条信息到操控台表明学生到达考场并调用 phaser 的 arriveAndAwaitAdvance() 075             // 方法来等待其他线程们。 076             System.out.printf("%s: Has arrived to do the exam. %s\n", Thread 077                     .currentThread().getName(), new Date()); 078             phaser.arriveAndAwaitAdvance(); 079  080             // 12. 然后,写信息到操控台,调用私有 doExercise1() 方法模拟第一场测验,写另一条信息到操控台并调用 phaser 081             // 的 arriveAndAwaitAdvance() 方法来等待其他学生结束第一场测验。 082             System.out.printf("%s: Is going to do the first exercise. %s\n", 083                     Thread.currentThread().getName(), new Date()); 084             doExercise1(); 085             System.out.printf("%s: Has done the first exercise. %s\n", Thread 086                     .currentThread().getName(), new Date()); 087             phaser.arriveAndAwaitAdvance(); 088  089             // 13. 为第二场和第三场实现相同的代码。 090             System.out.printf("%s: Is going to do the second exercise.%s\n", 091                     Thread.currentThread().getName(), new Date()); 092             doExercise2(); 093             System.out.printf("%s: Has done the second exercise. %s\n", Thread 094                     .currentThread().getName(), new Date()); 095             phaser.arriveAndAwaitAdvance(); 096             System.out.printf("%s: Is going to do the third exercise. %s\n", 097                     Thread.currentThread().getName(), new Date()); 098             doExercise3(); 099             System.out.printf("%s: Has finished the exam. %s\n", Thread 100                     .currentThread().getName(), new Date()); 101             phaser.arriveAndAwaitAdvance(); 102         } 103  104         // 14. 实现辅助方法 doExercise1()。此方法让线程随机休眠一段时间。 105         private void doExercise1() { 106             try { 107                 long duration = (long) (Math.random() * 10); 108                 TimeUnit.SECONDS.sleep(duration); 109             } catch (InterruptedException e) { 110                 e.printStackTrace(); 111             } 112         } 113  114         // 15. 实现辅助方法 doExercise2()。此方法让线程随机休眠一段时间。 115         private void doExercise2() { 116             try { 117                 long duration = (long) (Math.random() * 10); 118                 TimeUnit.SECONDS.sleep(duration); 119             } catch (InterruptedException e) { 120                 e.printStackTrace(); 121             } 122         } 123  124         // 16. 实现辅助方法 doExercise3()。此方法让线程随机休眠一段时间。 125         private void doExercise3() { 126             try { 127                 long duration = (long) (Math.random() * 10); 128                 TimeUnit.SECONDS.sleep(duration); 129             } catch (InterruptedException e) { 130                 e.printStackTrace(); 131             } 132         } 133     } 134}

    实现例子的main类,创建名为Main的类并添加main() 方法。

    01 package tool; 02  03 import tool.MyPhaser.Student; 04  05//17.  实现例子的main类,创建名为Main的类并添加main() 方法。 06 public class Main { 07  08     public static void main(String[] args) { 09  10         // 18. 创建 MyPhaser对象。 11         MyPhaser phaser = new MyPhaser(); 12  13         // 19. 创建5个 Student 对象并使用register()方法在phaser中注册他们。 14         MyPhaser.Student students[] = new Student[5]; 15         for (int i = 0; i < students.length; i++) { 16             students[i] = phaser.new Student(phaser); 17             phaser.register(); 18         } 19  20         // 20. 创建5个线程来运行students并开始它们。 21         Thread threads[] = new Thread[students.length]; 22         for (int i = 0; i < students.length; i++) { 23             threads[i] = new Thread(students[i], "Student " + i); 24             threads[i].start(); 25         } 26  27         // 21. 等待5个线程的终结。 28         for (int i = 0; i < threads.length; i++) { 29             try { 30                 threads[i].join(); 31             } catch (InterruptedException e) { 32                 e.printStackTrace(); 33             } 34         } 35  36         // 22. 调用isTerminated()方法来写一条信息表明phaser是在termination状态。 37         System.out.printf("Main: The phaser has finished: %s.\n", 38                 phaser.isTerminated()); 39     } 40}

    它是怎么工作的…

    这个练习模拟了有3个测验的真实测试。全部的学生必须都完成同一个测试才能开始下一个测试。为了实现这个必须使用同步,我们使用了Phaser类,但是你实现了你自己的phaser通过扩展原来的类,并覆盖onAdvance() 方法.

    在阶段改变之前和在唤醒 arriveAndAwaitAdvance() 方法中休眠的全部线程们之前,此方法被 phaser 调用。这个方法接收当前阶段数作为参数,0是第一个phase ,还有注册的参与者数。最有用的参数是actual phase。如果你要基于不同的当前阶段执行不同的操作,那么你必须使用选择性结构(if/else 或 switch)来选择你想执行的操作。例子里,我们使用了 switch 结构来为每个phase的改变选择不同的方法。

    onAdvance() 方法返回 Boolean 值表明 phaser 终结与否。如果返回 false 值,表示它还没有终结,那么线程将继续执行其他phases。如果phaser 返回真值,那么phaser将叫醒全部待定的线程们,并且转移phaser到terminated 状态,所以之后的任何对phaser的方法的调用都会被立刻返回,还有isTerminated() 方法将返回真值。

    在核心类,当你创建 MyPhaser 对象,在phaser中你不用表示参与者的数量。你为每个 Student 对象调用了 register() 方法创建了phaser的参与者的注册。这个调用不会在Student 对象或者执行它的线程与phaser之间这个建立任何关系。 说真的,phaser的参与者数就是个数字而已。phaser与参与者之间没有任何关系。

    下面的裁图展示了例子的执行结果:

    你可以发现学生们结束第一个练习的时间是不同的。当全部都结束练习时,phaser 调用onAdvance() 方法写信息到操控台,接着全部的学生在同一时间开始第二场测试。

    参见

    第三章,线程同步应用:运行并发阶段性任务 第八章,测试并发应用:监控 Phaser

    文章转自 并发编程网-ifeve.com

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)