java 8 Stream并行流与Spliterator关系

    xiaoxiao2023-11-04  149

    java 8并行流使用

    当个线程计算1+2+3+...+1000=?,java 8代码为:

    public class DemoTest { public static void main(String[] args) { IntStream strem = IntStream.rangeClosed(1, 1000); int sum = strem.sum(); System.out.println(sum); } }

    测试结果:

    500500

    引入并行计算,只需要添加strem.parallel()即可,具体如下:

    public class DemoTest { public static void main(String[] args) { IntStream strem = IntStream.rangeClosed(1, 1000); int sum = strem.parallel().sum(); System.out.println(sum); } }

    以上基本上没任何附带成本,基本上可以理解为免费的并行,代码类似于。

    jdk7中fork/join框架:https://blog.csdn.net/dengjili/article/details/90547583

    显示实现并行,依赖于Spliterator接口,下面讲解Spliterator使用

    Spliterator可分迭代器

    以下为Spliterator部分接口

    public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }

    下面给出一个具体例子,不然Spliterator接口非常难懂,例子为:统计一段语言中字符串个数,以下给出一个从易到难的过程。

    普通写法

    定义一个方法,统计字符个数

    public class WordCounterNormal { public static void main(String[] args) { final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words"); } public static int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) { counter++; } lastSpace = false; } } return counter; } } Found 19 words

    使用java 8改写

    封装方法定义,主要是针对收集器steam.reduce方法

    <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)

    三个参数含义

    初始化累加组装

    具体实现

    public class WordCounter { private final int counter; private final boolean lastSpace; // 1. 初始化 public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } // 2. 累加 public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } // 3. 组装 public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }

    调用

    public class WordCounterTest { public static void main(String[] args) { final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); System.out.println(wordCounter.getCounter()); } } 19

    使用Spliterator并行

    class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } /** * tryAdvance 方法的行为类似于普通的 * Iterator ,因为它会按顺序一个一个使用 Spliterator 中的元素,并且如果还有其他元素要遍 * 历就返回 true 。但 trySplit 是专为 Spliterator 接口设计的,因为它可以把一些元素划出去分 * 给第二个 Spliterator (由该方法返回),让它们两个并行处理 */ @Override public boolean tryAdvance(Consumer<? super Character> action) { // 处理当前字符 action.accept(string.charAt(currentChar++)); // 如果还有字符要处理,则返回 true return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; // 返回 null 表示要解 析 的 String已经足够小,可以顺序处理 if (currentSize < 10) { return null; } // 将试探拆分位置设定为要 解 析 的String 的中间让拆分 for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { // 让拆分位置前进直到下一个空格 if (Character.isWhitespace(string.charAt(splitPos))) { // 创 建 一 个 新WordCounter-Spliterator来解析 String从开始到拆分位置的部分 Spliterator<Character> spliterator = new WordCounterSpliterator( string.substring(currentChar, splitPos)); // 将这个 WordCounter-Spliterator 的 起 始位置设为拆分位置 currentChar = splitPos; return spliterator; } } return null; } // 剩下多少元素要遍历 @Override public long estimateSize() { return string.length() - currentChar; } // 返回一个 int ,代表 Spliterator 本身特性集的编码 @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }

    抽象方法是 characteristics字段说明

    特征含义ORDERED元素有既定的顺序(例如 List ),因此 Spliterator 在遍历和划分时也会遵循这一顺序DISTINCT对于任意一对遍历过的元素 x 和 y , x.equals(y) 返回 falseSORTED遍历的元素按照一个预定义的顺序排序SIZED该 Spliterator 由一个已知大小的源建立(例如 Set ),因此 estimatedSize() 返回的是准确值NONNULL保证遍历的元素不会为 nullIMMUTABLSpliterator 的数据源不能修改。这意味着在遍历时不能添加、删除或修改任何元素CONCURRENT该 Spliterator 的数据源可以被其他线程同时修改而无需同步SUBSIZED该 Spliterator 和所有从它拆分出来的 Spliterator 都是 SIZED

    测试类

    public class WordCounterSpliteratorTest { public static void main(String[] args) { final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); System.out.println(wordCounter.getCounter()); } }

    测试结果:

    19

    此时,已经具备了并行并列统计

    等价parallel写法

    其中parallel()方法基本上是免费的并行

    public static void main(String[] args) { final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; Stream<Character> stream = IntStream.range(0, SENTENCE.length()).parallel() .mapToObj(SENTENCE::charAt); WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); System.out.println(wordCounter.getCounter()); }
    最新回复(0)