Všetky vyššie uvedené príklady predstavujú problém nájdenia najkratšej cesty medzi dvoma vrcholmi v grafe (trasa alebo cesta medzi dvoma miestami, množstvo akcií alebo zložitosť získania listu papiera z jedného alebo druhého miesta). Táto trieda problémov s najkratšou cestou sa nazýva SSSP (Single Source Shortest Path) a základným algoritmom na riešenie týchto problémov je , ktorý má výpočtovú zložitosť O(n^2)
.
Niekedy však potrebujeme nájsť všetky najkratšie cesty medzi všetkými vrcholmi. Zvážte nasledujúci príklad: vytvárate pre vás mapu pravidelných pohybov medzi domovom , prácou a divadlom . V tomto scenári budete mať 6 trás: work ⭢ home
, home ⭢ work
, work ⭢ theatre
, theatre ⭢ work
, home ⭢ theatre
a theatre ⭢ home
(opačné trasy sa môžu líšiť napríklad z dôvodu jednosmerných ciest) .
A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)
Čo nám dáva 12 trás pre 4 miesta a 90 trás pre 10 miest (čo je pôsobivé). Poznámka... toto sa neberie do úvahy medziľahlé body medzi miestami, tj aby ste sa dostali z domu do práce, musíte prejsť 4 ulice, prejsť popri rieke a prejsť cez most. Ak si predstavíme, niektoré cesty môžu mať spoločné medziľahlé body... no... výsledkom bude veľmi veľký graf s množstvom vrcholov, kde každý vrchol bude predstavovať buď miesto, alebo významný medziľahlý bod. Trieda problémov, kde potrebujeme nájsť všetky najkratšie cesty medzi všetkými pármi vrcholov v grafe, sa nazýva APSP (All Pairs Shortest Paths) a základným algoritmom na riešenie týchto problémov je , ktorý má O(n^3)
výpočtová zložitosť.
Floyd-Warshallov algoritmus nájde všetky najkratšie cesty medzi každým párom vrcholov v grafe. Algoritmy publikoval Robert Floyd v [1] (ďalšie podrobnosti nájdete v časti „Odkazy“). V tom istom roku Peter Ingerman v [2] opísal modernú implementáciu algoritmu vo forme troch vnorených slučiek for
:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments
Ak ste nikdy nezmenili prácu s grafom reprezentovaným vo forme matice, môže byť ťažké pochopiť, čo robí vyššie uvedený algoritmus. Aby sme sa uistili, že sme na rovnakej stránke, pozrime sa na to, ako môže byť graf reprezentovaný vo forme matice a prečo je takáto reprezentácia užitočná pri riešení problému s najkratšou cestou.
Obrázok nižšie ilustruje graf 5 vrcholov. Vľavo je graf prezentovaný vo vizuálnej forme, ktorá pozostáva z kruhov a šípok, kde každý kruh predstavuje vrchol a šípka predstavuje hranu so smerom. Číslo vo vnútri kruhu zodpovedá číslu vrcholu a číslo nad okrajom zodpovedá hmotnosti hrany. Na pravej strane je rovnaký graf prezentovaný vo forme matice váh. Váhová matica je forma , kde každá bunka matice obsahuje „váhu“ – vzdialenosť medzi vrcholom i
(riadok) a vrcholom j
(stĺpec). Váhová matica neobsahuje informácie o „ceste“ medzi vrcholmi (zoznam vrcholov, cez ktoré sa dostanete z i
do j
) – iba váhu (vzdialenosť) medzi týmito vrcholmi.
V matici váh môžeme vidieť, že hodnoty buniek sa rovnajú váham medzi vrcholmi. Preto, ak si prezrieme cesty z vrcholu 0
(riadok 0
), uvidíme, že ... existuje len jedna cesta – od 0
do 1
. Ale na vizuálnej reprezentácii môžeme jasne vidieť cesty z vrcholu 0
k vrcholom 2
a 3
(cez vrchol 1
). Dôvod je jednoduchý – v počiatočnom stave obsahuje matica váh iba vzdialenosť medzi susednými vrcholmi. Na nájdenie zvyšku však stačí len táto informácia.
Pozrime sa, ako to funguje. Venujte pozornosť bunke W[0, 1]
. Jeho hodnota udáva, že z vrcholu 0
do vrcholu 1
vedie cesta s váhou rovnajúcou sa 1
(v skratke môžeme zapísať ako: 0 ⭢ 1: 1
). S týmito znalosťami teraz môžeme skenovať všetky bunky riadku 1
(ktorý obsahuje všetky váhy všetkých ciest z vrcholu 1
) a spätne portovať túto informáciu do riadku 0
, čím sa hmotnosť zvýši o 1
(hodnota W[0, 1]
).
Rovnakým postupom môžeme nájsť cesty z vrcholu 0
cez ďalšie vrcholy. Počas vyhľadávania sa môže stať, že k rovnakému vrcholu vedie viac ako jedna cesta a čo je dôležitejšie, váhy týchto ciest môžu byť rôzne. Príklad takejto situácie je znázornený na obrázku nižšie, kde vyhľadávanie od vrcholu 0
po vrchol 2
odhalilo ešte jednu cestu k vrcholu 3
s menšou hmotnosťou.
Máme dve cesty: pôvodnú cestu – 0 ⭢ 3: 4
a novú cestu, ktorú sme práve objavili – 0 ⭢ 2 ⭢ 3: 3
(pamätajte, že matica váh neobsahuje cesty, takže nevieme, ktorú vrcholy sú zahrnuté v pôvodnej ceste). Toto je moment, kedy sa rozhodneme ponechať si najkratšiu a zapíšeme 3
do bunky W[0, 3]
.
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm
Najvzdialenejší cyklus for
on k
iteruje cez všetky vrcholy v grafe a v každej iterácii premenná k
predstavuje vrchol , cez ktorý hľadáme cesty . Vnútorný cyklus for
on i
tiež iteruje cez všetky vrcholy v grafe a pri každej iterácii i
predstavuje vrchol, z ktorého hľadáme cesty . A nakoniec najvnútornejší cyklus for
j
iteruje cez všetky vrcholy v grafe a v každej iterácii j
predstavuje vrchol , ku ktorému hľadáme cestu. V kombinácii nám to dáva nasledovné: v každej iterácii k
hľadáme cesty od všetkých vrcholov i
do všetkých vrcholov j
až po vrchol k
. Vo vnútri cyklu porovnávame cestu i ⭢ j
(predstavenú W[i, j]
) s dráhou i ⭢ k ⭢ j
(predstavenú súčtom W[I, k]
a W[k, j]
) a napíšeme najkratšiu jeden späť do W[i, j]
.
Zdrojový kód a experimentálne grafy sú dostupné v na GitHub. Experimentálne grafy možno nájsť v adresári Data/Sparse-Graphs.zip
. Všetky benchmarky v tomto príspevku sú implementované v súbore .
NO_EDGE = (int.MaxValue / 2) - 1
.
Čo sa týka #1. Keď hovoríme o matriciach, bunky definujeme ako „riadky“ a „stĺpce“. Z tohto dôvodu sa zdá prirodzené predstaviť si maticu vo forme „štvorca“ alebo „obdĺžnika“ (obrázok 4a).
i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.
Lineárne pole váh matice je predpokladom pre efektívne vykonávanie Floyd-Warshallovho algoritmu. Dôvod nie je jednoduchý a podrobné vysvetlenie si zaslúži samostatný príspevok... alebo niekoľko príspevkov. V súčasnosti je však dôležité spomenúť, že takáto reprezentácia výrazne zlepšuje , čo má v skutočnosti veľký vplyv na výkon algoritmu.
- Poznámka autora
Čo sa týka #2. Ak sa bližšie pozriete na pseudokód algoritmu, nenájdete žiadne kontroly týkajúce sa existencie cesty medzi dvoma vrcholmi. Namiesto toho pseudokód jednoducho použije funkciu min()
. Dôvod je jednoduchý – pôvodne, ak medzi vrcholmi neexistuje žiadna cesta, je hodnota bunky nastavená na infinity
a vo všetkých jazykoch, okrem JavaScriptu, sú všetky hodnoty menšie ako infinity
. V prípade celých čísel môže byť lákavé použiť int.MaxValue
ako hodnotu „bez cesty“. To však povedie k pretečeniu celého čísla v prípadoch, keď hodnoty oboch ciest i ⭢ k
a k ⭢ j
budú rovné int.MaxValue
. Preto používame hodnotu, ktorá je o jednu menšia ako polovica int.MaxValue
.
- Premýšľavý čitateľ
Je to skutočne možné, ale nanešťastie to povedie k výraznému zníženiu výkonu. Stručne povedané, CPU vedie štatistiku výsledkov hodnotenia pobočky napr. keď sa niektoré z výrokov if
vyhodnotia ako true
alebo false
. Túto štatistiku používa na spustenie kódu „štatisticky predpovedanej vetvy“ vopred pred vyhodnotením skutočného príkazu if
(toto sa nazýva ), a preto vykoná kód efektívnejšie. Keď je však predikcia CPU nepresná, spôsobuje to značnú stratu výkonu v porovnaní so správnou predikciou a bezpodmienečným vykonávaním, pretože CPU sa musí zastaviť a vypočítať správnu vetvu.
Pretože pri každej iterácii k
aktualizujeme významnú časť matice váh, štatistika vetvy CPU sa stáva zbytočnou, pretože neexistuje vzor kódu, všetky vetvy sú založené výlučne na údajoch. Takže takáto kontrola bude mať za následok značné množstvo .
Uhh, zdá sa, že sme skončili s teoretickou časťou – poďme implementovať algoritmus (túto implementáciu označujeme ako Baseline
):
public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Vyššie uvedený kód je takmer identickou kópiou vyššie uvedeného pseudokódu s jedinou výnimkou – namiesto Math.Min()
používame if
na aktualizáciu bunky len v prípade potreby.
- Premýšľavý čitateľ
Dôvod je jednoduchý. V momente písania JIT vydáva takmer ekvivalentný kód pre implementácie if
aj Math.Min
. Môžete si to skontrolovať podrobne na adrese ale tu sú úryvky tiel hlavných slučiek:
// // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop
Môžete vidieť, bez ohľadu na to, či používame if
alebo Math.Min
stále existuje podmienená kontrola. Avšak v prípade if
nie je zbytočné písať.
var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.
Poďme rock!
Metóda | Veľkosť | Priemerná | Chyba | StdDev |
---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms |
Z výsledkov, ktoré môžeme vidieť, čas výpočtu dramaticky rastie v porovnaní s veľkosťou grafu – pre graf s 300 vrcholmi to trvalo 27 milisekúnd, pre graf s 2400 vrcholmi – 14,5 sekundy a pre graf 4800 – 119 sekúnd, čo je takmer 2 minúty !
V našich experimentoch však používame orientované, acyklické grafy, ktoré majú úžasnú vlastnosť – ak existuje cesta z vrcholu 1
do vrcholu 2
, potom z vrcholu 2
do vrcholu 1
žiadna cesta nevedie. Pre nás to znamená, že existuje veľa nesusediacich vrcholov, ktoré môžeme preskočiť, ak neexistuje cesta z i
do k
(túto implementáciu označujeme ako SpartialOptimisation
).
public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Tu sú výsledky predchádzajúcich ( Baseline
) a súčasných ( SpartialOptimisation
) implementácií na rovnakej sade grafov (najrýchlejšie výsledky sú zvýraznené tučným písmom ):
Metóda | Veľkosť | Priemerný | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
Môj počítač je vybavený procesorom Inter Core i7-7700 CPU 3.60GHz (Kaby Lake)
s 8 logickými procesormi ( ) alebo 4 jadrami s technológiou . Mať viac ako jedno jadro je ako mať viac „náhradných rúk“, ktoré môžeme dať do práce. Pozrime sa teda, ktorú časť práce možno efektívne rozdeliť medzi viacerých pracovníkov a potom ju paralelne uviesť.
Začnime od for of k
cyklu. V každej iteračnej slučke vypočítava cesty z každého vrcholu do každého vrcholu cez vrchol k
. Nové a aktualizované cesty sú uložené v matici váh. Pri pohľade na iterácie si môžete všimnúť – možno ich vykonať v akomkoľvek poradí: 0, 1, 2, 3 alebo 3, 1, 2, 0 bez ovplyvnenia výsledku. Stále sa však musia vykonávať postupne, inak niektoré z iterácií nebudú môcť používať nové alebo aktualizované cesty, pretože ešte nebudú zapísané do matice váh. Takáto nekonzistentnosť určite zničí výsledky. Musíme teda hľadať ďalej.
Ďalším kandidátom je slučka for of i
. V každej iteračnej slučke načíta cestu z vrcholu i
do vrcholu k
(bunka: W[i, k]
), cestu z vrcholu k
do vrcholu j
(bunka: W[k, j
]) a potom skontroluje, či známa cesta z i
do j
(bunka: W[i, j]
) je kratšia ako cesta i ⭢ k ⭢ j
(súčet: W[i, k]
+ W[k, j]
). Ak sa pozriete bližšie na prístupový vzor, môžete si všimnúť – pri každej iterácii i
slučka číta dáta z k
riadku a aktualizovaného i
riadku, čo v podstate znamená – iterácie sú nezávislé a môžu sa vykonávať nielen v akomkoľvek poradí, ale aj paralelne!
Vyzerá to sľubne, tak to poďme implementovať (túto implementáciu označujeme ako SpartialParallelOptimisations
).
for of j
slučka môže byť tiež paralelná. Paralelizácia najvnútornejšieho cyklu je však v tomto prípade veľmi neefektívna. Môžete si to overiť sami vykonaním niekoľkých jednoduchých zmien v .
- Poznámka autora
public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
Tu sú výsledky implementácií Baseline
, SpartialOptimisation
a SpartialParallelOptimisations
na rovnakej sade grafov (paralelizácia sa vykonáva pomocou triedy ):
Metóda | Veľkosť | Priemerná | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6,252 ms | 0,0211 ms | 0,0187 ms | 0,23 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35,825 ms | 0,1003 ms | 0,0837 ms | 0,16 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
SpartialParallelOptimizations | 1200 | 248,801 ms | 0,6040 ms | 0,5043 ms | 0,14 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2 076,403 ms | 20,8320 ms | 19,4863 ms | 0,14 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
SpartialParallelOptimizations | 4800 | 15 614,506 ms | 115,6996 ms | 102,5647 ms | 0,13 |
Z výsledkov môžeme vidieť, že paralelizácia cyklu for of i
skrátila čas výpočtu 2-4 krát v porovnaní s predchádzajúcou implementáciou ( SpartialOptimisation
)! Je to veľmi pôsobivé, ale je dôležité si uvedomiť, že paralelizácia čistých výpočtov spotrebúva všetky dostupné výpočtové zdroje, čo môže viesť k nedostatku zdrojov iných aplikácií v systéme.
var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];
Priveľmi zjednodušeným spôsobom možno vykonanie iterácie 0
vyššie uvedeného cyklu for
na úrovni CPU znázorniť takto:
Tu je to, čo sa deje. CPU načítava hodnoty polí a
a b
z pamäte do registrov CPU (jeden register môže obsahovať práve jednu hodnotu). Potom CPU vykoná operáciu skalárneho sčítania na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa c
. Tento proces sa opakuje štyrikrát pred koncom slučky.
Tu je to, čo sa deje. CPU načíta hodnoty polí a
a b
z pamäte do registrov CPU (tentoraz však jeden vektorový register môže obsahovať dve hodnoty poľa). Potom CPU vykoná operáciu sčítania vektorov na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa c
. Pretože pracujeme s dvoma hodnotami naraz, tento proces sa namiesto štyroch opakuje dvakrát.
public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Vektorizovaný kód môže vyzerať trochu bizarne, tak si ho poďme prejsť krok za krokom.
Vytvoríme vektor cesty i ⭢ k
: var ik_vec = new Vector<int>(matrix[i * sz + k])
. Výsledkom je, že ak vektor môže obsahovať štyri hodnoty typu int
a váha cesty i ⭢ k
je rovná 5, vytvoríme vektor štyroch 5 – [5, 5, 5, 5]. Ďalej pri každej iterácii súčasne vypočítame cesty z vrcholu i
k vrcholom j, j + 1, ..., j + Vector<int>.Count
.
Vlastnosť Vector<int>.Count
vráti počet prvkov typu int
, ktoré sa zmestia do vektorových registrov. Veľkosť vektorových registrov závisí od modelu CPU, takže táto vlastnosť môže vrátiť rôzne hodnoty na rôznych CPU.
- Poznámka autora
ij_vec
a ikj_vec
.ij_vec
a ikj_vec
a vyberte najmenšie hodnoty do nového vektora r_vec
.r_vec
späť do matice váh.
Zatiaľ čo #1 a #3 sú celkom jednoduché, #2 vyžaduje vysvetlenie. Ako už bolo spomenuté, pomocou vektorov manipulujeme s viacerými hodnotami súčasne. Preto výsledok niektorých operácií nemôže byť singulárny, tj porovnávacia operácia Vector.LessThan(ij_vec, ikj_vec)
nemôže vrátiť true
alebo false
, pretože porovnáva viacero hodnôt. Takže namiesto toho vráti nový vektor, ktorý obsahuje výsledky porovnania medzi zodpovedajúcimi hodnotami z vektorov ij_vec
a ikj_vec
( -1
, ak je hodnota z ij_vec
menšia ako hodnota z ikj_vec
a 0
, ak je to inak). Vrátený vektor (samotný o sebe) nie je veľmi užitočný, ale môžeme použiť vektorovú operáciu Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec)
na extrahovanie požadovaných hodnôt z vektorov ij_vec
a ikj_vec
do nového vektora – r_vec
. Táto operácia vráti nový vektor, kde hodnoty sú rovné menšie ako dve zodpovedajúce hodnoty vstupných vektorov, tj ak je hodnota vektora lt_vec
rovná -1
, potom operácia vyberie hodnotu z ij_vec
, v opačnom prípade vyberie hodnotu z ikj_vec
.
Okrem #2 je tu ešte jedna časť, ktorá si vyžaduje vysvetlenie – druhá, nevektorizovaná slučka. Vyžaduje sa, keď veľkosť matice váh nie je súčinom hodnoty Vector<int>.Count
. V takom prípade hlavná slučka nemôže spracovať všetky hodnoty (pretože nemôžete načítať časť vektora) a druhá, nevektorovaná, slučka vypočíta zostávajúce iterácie.
Metóda | sz | Priemerná | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6,252 ms | 0,0211 ms | 0,0187 ms | 0,23 |
SpartialVectorOptimizations | 300 | 3,056 ms | 0,0301 ms | 0,0281 ms | 0,11 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35,825 ms | 0,1003 ms | 0,0837 ms | 0,16 |
SpartialVectorOptimizations | 600 | 24,378 ms | 0,4320 ms | 0,4041 ms | 0,11 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
SpartialParallelOptimizations | 1200 | 248,801 ms | 0,6040 ms | 0,5043 ms | 0,14 |
SpartialVectorOptimizations | 1200 | 185,628 ms | 2,1240 ms | 1,9868 ms | 0,11 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2 076,403 ms | 20,8320 ms | 19,4863 ms | 0,14 |
SpartialVectorOptimizations | 2400 | 2 568,676 ms | 31,7359 ms | 29,6858 ms | 0,18 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
SpartialParallelOptimizations | 4800 | 15 614,506 ms | 115,6996 ms | 102,5647 ms | 0,13 |
SpartialVectorOptimizations | 4800 | 18 257,991 ms | 84,5978 ms | 79,1329 ms | 0,15 |
Z výsledku je zrejmé, že vektorizácia výrazne skrátila čas výpočtu – 3 až 4 krát v porovnaní s neparalelnou verziou ( SpartialOptimisation
). Zaujímavým momentom je, že vektorizovaná verzia tiež prekonáva paralelnú verziu ( SpartialParallelOptimisations
) na menších grafoch (menej ako 2400 vrcholov).
Ak vás zaujíma praktická aplikácia vektorizácie, odporúčam vám prečítať si sériu príspevkov od , kde vektorizoval Array.Sort
(tieto výsledky sa neskôr ocitli v aktualizácii pre Garbage Collector v ).
Posledná implementácia kombinuje úsilie o paralelizáciu a vektorizáciu a... úprimne povedané, chýba jej individualita 🙂 Pretože... no, práve sme nahradili telo Parallel.For
od SpartialParallelOptimisations
vektorizovanou slučkou od SpartialVectorOptimisations
:
public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
Všetky výsledky tohto príspevku sú uvedené nižšie. Ako sa očakávalo, simultánne použitie paralelizmu a vektorizácie preukázalo najlepšie výsledky, ktoré skrátilo čas výpočtu až 25-krát (pre grafy s 1200 vrcholmi) v porovnaní s počiatočnou implementáciou.
Metóda | sz | Priemerná | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6,252 ms | 0,0211 ms | 0,0187 ms | 0,23 |
SpartialVectorOptimizations | 300 | 3,056 ms | 0,0301 ms | 0,0281 ms | 0,11 |
SpartialParallelVectorOptimizations | 300 | 3,008 ms | 0,0075 ms | 0,0066 ms | 0,11 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35,825 ms | 0,1003 ms | 0,0837 ms | 0,16 |
SpartialVectorOptimizations | 600 | 24,378 ms | 0,4320 ms | 0,4041 ms | 0,11 |
SpartialParallelVectorOptimizations | 600 | 13,425 ms | 0,0319 ms | 0,0283 ms | 0,06 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
SpartialParallelOptimizations | 1200 | 248,801 ms | 0,6040 ms | 0,5043 ms | 0,14 |
SpartialVectorOptimizations | 1200 | 185,628 ms | 2,1240 ms | 1,9868 ms | 0,11 |
SpartialParallelVectorOptimizations | 1200 | 76,770 ms | 0,3021 ms | 0,2522 ms | 0,04 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2 076,403 ms | 20,8320 ms | 19,4863 ms | 0,14 |
SpartialVectorOptimizations | 2400 | 2 568,676 ms | 31,7359 ms | 29,6858 ms | 0,18 |
SpartialParallelVectorOptimizations | 2400 | 1 281,877 ms | 25,1127 ms | 64,8239 ms | 0,09 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
SpartialParallelOptimizations | 4800 | 15 614,506 ms | 115,6996 ms | 102,5647 ms | 0,13 |
SpartialVectorOptimizations | 4800 | 18 257,991 ms | 84,5978 ms | 79,1329 ms | 0,15 |
SpartialParallelVectorOptimizations | 4800 | 12 785,824 ms | 98,6947 ms | 87,4903 ms | 0,11 |