Sprievodca knižnicou paralelných zberateľov Java

1. Úvod

Parallel-collectors je malá knižnica, ktorá poskytuje sadu kolektorov rozhrania Java Stream API, ktoré umožňujú paralelné spracovanie - a zároveň obchádzajú hlavné nedostatky štandardných paralelných streamov.

2. Maven závislosti

Ak chceme začať používať knižnicu, musíme pridať jednu položku do Mavenu pom.xml spis:

 com.pivovarit paralelné kolektory 1.1.0 

Alebo jeden riadok v zostave súboru Gradle:

kompilujte 'com.pivovarit: paralelné kolektory: 1.1.0'

Najnovšiu verziu nájdete na serveri Maven Central.

3. Upozornenia na paralelné prúdy

Paralelné prúdy boli jedným z vrcholov Java 8, ale ukázalo sa, že sú použiteľné výlučne pre ťažké spracovanie procesora.

Dôvodom bola skutočnosť, že Paralelné toky boli vnútorne podporované zdieľaním v rámci celého JVM ForkJoinPool, ktorá poskytovala obmedzený paralelizmus a používali ho všetky paralelné toky bežiace na jednej inštancii JVM.

Predstavte si napríklad, že máme zoznam ID a chceme ich použiť na načítanie zoznamu používateľov a že táto operácia je drahá.

Na to by sme mohli použiť paralelné prúdy:

Zoznam id = Arrays.asList (1, 2, 3); Zoznam výsledkov = ids.parallelStream () .map (i -> fetchById (i)) // každá operácia trvá jednu sekundu .collect (Collectors.toList ()); System.out.println (výsledky); // [user-1, user-2, user-3]

A skutočne vidíme, že je tu badateľné zrýchlenie. Ale stane sa problematické, ak začneme prevádzkovať viac paralelných blokujúcich operácií ... paralelne. To môže rýchlo nasýtiť bazén a viesť k potenciálne obrovským latenciám. Preto je dôležité vytvárať priedely vytváraním samostatných oblastí vlákien - aby sa zabránilo vzájomnému ovplyvňovaniu vzájomných úloh.

S cieľom poskytnúť zvyk ForkJoinPool mohli by sme využiť trik opísaný tu, ale tento prístup sa spoliehal na neregistrovaný hack a bol chybný až do JDK10. Viac sa dočítame v samotnom čísle - [JDK8190974].

4. Paralelné zberače v akcii

Paralelné zberače, ako už názov napovedá, sú iba štandardné zberače Stream API, ktoré umožňujú paralelné vykonávanie ďalších operácií zbierať () fáza.

ParallelCollectors (ktoré zrkadlí Zberatelia trieda) trieda je fasáda poskytujúca prístup k celej funkčnosti knižnice.

Ak by sme chceli zopakovať vyššie uvedený príklad, mohli by sme jednoducho napísať:

ExecutorService vykonávateľ = Executors.newFixedThreadPool (10); Zoznam id = Arrays.asList (1, 2, 3); CompletableFuture results = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), vykonávateľ, 4)); System.out.println (results.join ()); // [user-1, user-2, user-3]

Výsledok je však rovnaký boli sme schopní poskytnúť náš vlastný fond vlákien, určiť našu vlastnú úroveň paralelnosti a výsledok sa dostavil zabalený do a CompletableFuture inštancia bez blokovania aktuálneho vlákna.

Na druhej strane štandardné paralelné prúdy nemohli dosiahnuť nič z toho.

4.1. ParallelCollectors.parallelToList / ToSet ()

Akokoľvek intuitívne to bude, ak chceme spracovať a Prúd paralelne a zhromaždiť výsledky do a Zoznam alebo Nastaviť, môžeme jednoducho použiť ParallelCollectors.parallelToList alebo parallelToSet:

Zoznam id = Arrays.asList (1, 2, 3); Zoznam výsledkov = ids.stream () .collect (parallelToList (i -> fetchById (i), exekútor, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

Ak chceme zbierať Prúd prvky do a Mapa inštancia, rovnako ako pri Stream API, musíme poskytnúť dvoch mapovačov:

Zoznam id = Arrays.asList (1, 2, 3); Výsledky mapy = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), exekútor, 4)) .join (); // {1 = user-1, 2 = user-2, 3 = user-3}

Môžeme poskytnúť aj zvyk Mapa inštancia Dodávateľ:

Výsledky mapy = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, vykonávateľ, 4)) .join (); 

A stratégia riešenia konfliktov na mieru:

Zoznam id = Arrays.asList (1, 2, 3); Výsledky mapy = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, exekútor, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

Podobne ako je uvedené vyššie, môžeme odovzdať náš zvyk Dodávateľ zbierky ak chceme získať výsledky zabalené v našom vlastnom kontajneri:

Zoznam výsledkov = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, vykonávateľ, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

Ak to nestačí, môžeme skutočne získať a Prúd inštanciu a tam pokračovať vo vlastnom spracovaní:

Mapa results = ids.stream () .collect (parallelToStream (i -> fetchById (i), executor, 4)) .thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .pripojiť sa ();

4.5. ParallelCollectors.parallel ()

Ten nám umožňuje streamovať výsledky v poradí dokončenia:

ids.stream () .collect (paralelný (i -> fetchByIdWithRandomDelay (i), exekútor, 4)) .forEach (System.out :: println); // user-1 // user-3 // user-2 

V tomto prípade môžeme očakávať, že kolektor vráti zakaždým iné výsledky, pretože sme zaviedli oneskorenie náhodného spracovania.

4.6. ParallelCollectors.parallelOrdered ()

Toto zariadenie umožňuje streamovanie výsledkov rovnako ako vyššie uvedené, ale zachováva pôvodné poradie:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), exekútor, 4)) .forEach (System.out :: println); // user-1 // user-2 // user-3 

V takom prípade bude zberateľ vždy udržiavať poradie, ale môže byť pomalší ako vyššie uvedené.

5. Obmedzenia

V čase písania tohto článku paralelné kolektory nepracujú s nekonečnými prúdmi aj keď sa použijú operácie skratu - je to konštrukčné obmedzenie uložené internými funkciami Stream API. Jednoducho povedané, Prúds zaobchádza s kolektormi ako s operáciami, ktoré ich neskratujú, takže prúd musí pred ukončením spracovať všetky prvky proti prúdu.

Ďalším obmedzením je to operácie skratu neruší zvyšné úlohy po skratovaní.

6. Záver

Videli sme, ako nám knižnica paralelných kolektorov umožňuje vykonávať paralelné spracovanie pomocou vlastného rozhrania Java Stream API Zberatelia a CompletableFutures využívať vlastné súbory vlákien, paralelizmus a neblokujúci štýl CompletableFutures.

Útržky kódu sú ako vždy k dispozícii na GitHub.

Ďalšie čítanie nájdete v knižnici paralelných kolektorov na GitHub, autorskom blogu a autorovom twitterovom účte.