כל הדוגמאות לעיל מייצגות בעיה של מציאת הנתיב הקצר ביותר בין שני קודקודים בגרף (מסלול או נתיב בין שני מקומות, מספר פעולות או מורכבות של קבלת דף נייר ממקום זה או אחר). מחלקה זו של בעיות הנתיב הקצר ביותר נקראת SSSP (Single Source Shortest Path) והאלגוריתם הבסיסי לפתרון בעיות אלו הוא , בעל מורכבות חישובית O(n^2)
.
אבל, לפעמים אנחנו צריכים למצוא את כל הנתיבים הקצרים ביותר בין כל הקודקודים. שקול את הדוגמה הבאה: אתה יוצר עבורך מפה תנועות קבועות בין הבית , העבודה והתיאטרון . בתרחיש זה תקבלו 6 מסלולים: work ⭢ home
, home ⭢ work
, work ⭢ theatre
, theatre ⭢ work
, home ⭢ theatre
theatre ⭢ home
(המסלולים ההפוכים יכולים להיות שונים בגלל כבישים חד-סטריים למשל) .
A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)
מה שנותן לנו 12 מסלולים ל-4 מקומות ו-90 מסלולים ל-10 מקומות (וזה מרשים). שימו לב... זאת מבלי להתחשב בנקודות ביניים בין מקומות, כלומר, כדי להגיע מהבית לעבודה צריך לחצות 4 רחובות, ללכת לאורך הנהר ולחצות את הגשר. אם נדמיין, למסלולים מסוימים יכולים להיות נקודות ביניים משותפות... ובכן... כתוצאה מכך יהיה לנו גרף גדול מאוד, עם הרבה קודקודים, כאשר כל קודקוד ייצג או מקום או נקודת ביניים משמעותית. מחלקת הבעיות, שבה עלינו למצוא את כל הנתיבים הקצרים ביותר בין כל זוגות הקודקודים בגרף, נקראת APSP (All Pairs Shortest Paths) והאלגוריתם הבסיסי לפתרון בעיות אלו הוא , שיש לו O(n^3)
מורכבות חישובית.
אלגוריתם פלויד-ורשל מוצא את כל הנתיבים הקצרים ביותר בין כל זוג קודקודים בגרף. האלגוריתמים פורסמו על ידי רוברט פלויד ב-[1] (ראה סעיף "הפניות" לפרטים נוספים). באותה שנה, פיטר אינגרמן ב-[2] תיאר יישום מודרני של האלגוריתם בצורה של שלושה לולאות for
:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments
אם מעולם לא היה לך שינוי לעבוד עם גרף המיוצג בצורה של מטריצה אז זה יכול להיות קשה להבין מה האלגוריתם לעיל עושה. אז, כדי להבטיח שאנחנו באותו עמוד, הבה נבחן כיצד ניתן לייצג גרף בצורה של מטריצה ומדוע ייצוג כזה מועיל לפתרון בעיית הנתיב הקצר ביותר.
התמונה למטה ממחישה גרף של 5 קודקודים. משמאל, הגרף מוצג בצורה ויזואלית, העשוי מעיגולים וחצים, כאשר כל עיגול מייצג קודקוד וחץ מייצג קצה עם כיוון. מספר בתוך עיגול מתאים למספר קודקוד ומספר מעל קצה מתאים למשקל הקצה. בצד ימין, אותו גרף מוצג בצורה של מטריצת משקל. מטריצת משקל היא צורה של שבה כל תא מטריצה מכיל "משקל" - מרחק בין קודקוד i
(שורה) לקודקוד j
(עמודה). מטריצת משקל לא כוללת מידע על "נתיב" בין קודקודים (רשימת קודקודים שדרכם מגיעים מ- i
ל- j
) - רק משקל (מרחק) בין קודקודים אלה.
במטריצת משקל אנו יכולים לראות שערכי התא שווים למשקולות בין קודקודים . לכן, אם נבדוק נתיבים מקודקוד 0
(שורה 0
), נראה ש... יש רק נתיב אחד - מ 0
ל 1
. אבל, על ייצוג חזותי אנו יכולים לראות בבירור נתיבים מקודקוד 0
לקודקודים 2
ו 3
(דרך קודקוד 1
). הסיבה לכך פשוטה - במצב התחלתי, מטריצת משקל מכילה מרחק בין קודקודים סמוכים בלבד. עם זאת, מידע זה לבדו מספיק כדי למצוא את השאר.
בוא נראה איך זה עובד. שימו לב לתא W[0, 1]
. הערך שלו מצביע על כך שיש נתיב מקודקוד 0
לקודקוד 1
עם משקל שווה ל 1
(בקיצור נוכל לכתוב כמו: 0 ⭢ 1: 1
). עם הידע הזה, אנחנו יכולים כעת לסרוק את כל התאים של שורה 1
(המכילה את כל המשקלים של כל הנתיבים מקודקוד 1
) ולהעביר את המידע הזה בחזרה לשורה 0
, ולהגדיל את המשקל ב 1
(ערך של W[0, 1]
).
באמצעות אותם שלבים נוכל למצוא נתיבים מקודקוד 0
דרך קודקודים אחרים. במהלך החיפוש, יכול לקרות שיש יותר מנתיב אחד המוביל לאותו קודקוד ומה שחשוב יותר המשקלים של נתיבים אלו יכולים להיות שונים. דוגמה למצב כזה מומחשת בתמונה למטה, כאשר חיפוש מקודקוד 0
עד קודקוד 2
גילה נתיב נוסף לקודקוד 3
במשקל קטן יותר.
יש לנו שני נתיבים: נתיב מקורי – 0 ⭢ 3: 4
ונתיב חדש שגילינו זה עתה – 0 ⭢ 2 ⭢ 3: 3
(זכור, מטריצת משקל לא מכילה נתיבים, אז אנחנו לא יודעים איזה קודקודים כלולים בנתיב המקורי). זה הרגע שבו אנחנו מקבלים החלטה לשמור על הקצר ביותר ולכתוב 3
לתא W[0, 3]
.
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm
המחזור החיצוני ביותר for
on k
עובר על כל הקודקודים בגרף ובכל איטרציה, משתנה k
מייצג קודקוד שאנו מחפשים דרכו . מחזור פנימי for
on i
חוזר גם על כל הקודקודים בגרף ובכל איטרציה, i
מייצג קודקוד שממנו אנו מחפשים נתיבים . ולבסוף, מחזור פנימי ביותר for
ב- j
איטרציות על כל הקודקודים בגרף ובכל איטרציה, j
מייצג קודקוד אליו אנו מחפשים נתיב. בשילוב זה נותן לנו את הדברים הבאים: בכל איטרציה k
אנחנו מחפשים נתיבים מכל הקודקודים i
לתוך כל הקודקודים j
דרך קודקוד k
. בתוך מחזור נשווה נתיב i ⭢ j
(מיוצג על ידי W[i, j]
) עם נתיב i ⭢ k ⭢ j
(מיוצג על ידי סכום של W[I, k]
ו- W[k, j]
) וכותבים את הקצר ביותר אחד בחזרה לתוך W[i, j]
.
קוד המקור והגרפים הניסיוניים זמינים ב-GitHub. ניתן למצוא את הגרפים הניסיוניים בספריית Data/Sparse-Graphs.zip
. כל המדדים בפוסט זה מיושמים בקובץ .
NO_EDGE = (int.MaxValue / 2) - 1
.
לגבי מס' 1. כאשר אנו מדברים על מטריצות אנו מגדירים תאים במונחים של "שורות" ו"עמודות". בגלל זה נראה טבעי לדמיין מטריצה בצורה של "ריבוע" או "מלבן" (תמונה 4א).
i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.
מערך ליניאלי של מטריצת משקל הוא תנאי מוקדם לביצוע יעיל של אלגוריתם פלויד-ורשל. הסיבה לכך אינה פשוטה והסבר מפורט ראוי לפוסט נפרד... או כמה פוסטים. עם זאת, נכון לעכשיו, חשוב להזכיר שייצוג כזה משפר באופן משמעותי , שלמעשה יש לו השפעה רבה על ביצועי האלגוריתם.
- הערת המחבר
לגבי מס' 2. אם תסתכל מקרוב על האלגוריתם פסאודוקוד, לא תמצא שום בדיקה הקשורה לקיומו של נתיב בין שני קודקודים. במקום זאת, פסאודוקוד פשוט השתמש בפונקציה min()
. הסיבה פשוטה - במקור, אם אין נתיב בין לקודקודים, ערך תא מוגדר infinity
ובכל השפות, למעט שעשויה להיות JavaScript, כל הערכים קטנים infinity
. במקרה של מספרים שלמים זה עשוי להיות מפתה להשתמש ב- int.MaxValue
כערך "ללא נתיב". עם זאת, הדבר יוביל לגלישה של מספרים שלמים במקרים בהם הערכים של שני הנתיבים i ⭢ k
ו- k ⭢ j
יהיו שווים ל- int.MaxValue
. לכן אנו משתמשים בערך שהוא אחד פחות מחצי מה- int.MaxValue
.
- קורא מתחשב
זה אכן אפשרי אבל למרבה הצער זה יוביל לעונש ביצוע משמעותי. בקיצור, CPU שומר נתונים סטטיסטיים של תוצאות הערכת ענפים, למשל. כאשר חלק מהצהרות if
מוערכות true
או false
. הוא משתמש בסטטיסטיקה זו כדי לבצע קוד של "ענף חזוי סטטיסטית" מראש לפני הערכת הצהרת if
בפועל (זה נקרא ) ולכן לבצע קוד בצורה יעילה יותר. עם זאת, כאשר חיזוי CPU אינו מדויק, הוא גורם לאובדן ביצועים משמעותי בהשוואה לחיזוי נכון וביצוע ללא תנאי מכיוון שה-CPU צריך לעצור ולחשב את הענף הנכון.
מכיוון שבכל איטרציה k
אנו מעדכנים חלק ניכר ממטריצת המשקל, הסטטיסטיקה של ענפי ה-CPU הופכת חסרת תועלת מכיוון שאין תבנית קוד, כל הענפים מבוססים אך ורק על נתונים. אז בדיקה כזו תגרום לכמות משמעותית של .
אהה, נראה שסיימנו עם החלק התיאורטי - בוא ניישם את האלגוריתם (אנו מציינים את היישום הזה Baseline
):
public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
הקוד שלמעלה הוא כמעט עותק זהה של הפסאודוקוד שהוזכר קודם לכן למעט חריג בודד - במקום Math.Min()
אנו משתמשים if
כדי לעדכן תא רק בעת הצורך.
- קורא מתחשב
הסיבה פשוטה. ברגע הכתיבה JIT פולט קוד כמעט שווה ליישומים של if
ו- Math.Min
. אתה יכול לבדוק את זה בפרטים ב- אבל הנה קטעים של גופי הלולאה הראשיים:
// // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop
אתה עשוי לראות, ללא קשר אם אנו משתמשים if
או Math.Min
עדיין יש בדיקה מותנית. עם זאת, במקרה של if
אין כתוב מיותר.
var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.
בואו נדנד!
שִׁיטָה | גוֹדֶל | מְמוּצָע | שְׁגִיאָה | StdDev |
---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה |
מהתוצאות שאנו יכולים לראות, זמן החישוב גדל באופן דרמטי בהשוואה לגודל של גרף - עבור גרף של 300 קודקודים זה לקח 27 מילישניות, עבור גרף של 2400 קודקוד - 14.5 שניות ולגרף של 4800 - 119 שניות שהוא כמעט 2 דקות !
עם זאת, בניסויים שלנו אנו משתמשים בגרפים מכוונים, א-מחזוריים , בעלי תכונה נפלאה - אם יש נתיב מקודקוד 1
לקודקוד 2
, אז אין נתיב מקודקוד 2
לקודקוד 1
. עבורנו, זה אומר, יש הרבה קודקודים לא סמוכים שאנחנו יכולים לדלג עליהם אם אין נתיב מ- i
ל- k
(אנו מציינים את היישום הזה כ- SpartialOptimisation
).
public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
להלן התוצאות של יישומים קודמים ( Baseline
) והנוכחיים ( SpartialOptimisation
) על אותה סט של גרפים (התוצאות המהירות ביותר מודגשות בהדגשה ):
שִׁיטָה | גוֹדֶל | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 MS | 0.45 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
המחשב שלי מצויד במעבד Inter Core i7-7700 CPU 3.60GHz (Kaby Lake)
עם 8 מעבדים לוגיים ( ) או 4 ליבות עם טכנולוגיית . להחזיק יותר מליבה אחת זה כמו שיש לנו יותר "ידיים פנויות" שאנו יכולים להפעיל. אז בואו נראה איזה חלק בעבודה ניתן לחלק ביעילות בין מספר עובדים ולאחר מכן, מקבילים אותו.
נתחיל מ- for of k
לולאה. בכל לולאת איטרציה מחשב נתיבים מכל קודקוד לכל קודקוד דרך קודקוד k
. נתיבים חדשים ומעודכנים מאוחסנים במטריצת משקל. בהסתכלות על איטרציות שאולי תבחין בהן - ניתן לבצע אותן בכל סדר: 0, 1, 2, 3 או 3, 1, 2, 0 מבלי להשפיע על התוצאה. עם זאת, הם עדיין צריכים להתבצע ברצף, אחרת חלק מהאיטרציות לא יוכלו להשתמש בנתיבים חדשים או מעודכנים מכיוון שהם עדיין לא ייכתבו במטריצת משקל. חוסר עקביות כזה בהחלט ירסק את התוצאות. אז אנחנו צריכים להמשיך לחפש.
המועמד הבא הוא for of i
loop. בכל לולאת איטרציה קורא נתיב מקודקוד i
לקודקוד k
(תא: W[i, k]
), נתיב מקודקוד k
לקודקוד j
(תא: W[k, j
]) ולאחר מכן בודק אם נתיב ידוע מ- i
ל- j
(תא: W[i, j]
) קצר יותר מנתיב i ⭢ k ⭢ j
(סכום של: W[i, k]
+ W[k, j]
). אם תסתכלו מקרוב על דפוס הגישה, אולי תשימו לב - בכל איטרציה i
loop קורא נתונים מ- k
row ו- i
row מעודכן, מה שאומר בעצם - איטרציות הן עצמאיות וניתנות לביצוע לא רק בכל סדר אלא גם במקביל!
זה נראה מבטיח, אז בואו ליישם את זה (אנחנו מציינים את היישום הזה בתור SpartialParallelOptimisations
).
for of j
ניתן גם להקביל. עם זאת, הקבלה של המחזור הפנימי ביותר במקרה זה היא מאוד לא יעילה. אתה יכול לאמת זאת בעצמך על ידי ביצוע מספר שינויים פשוטים .
- הערת המחבר
public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
להלן התוצאות של יישומי Baseline
, SpartialOptimisation
ו- SpartialParallelOptimisations
על אותה סט של גרפים (המקבילות מתבצעת באמצעות מחלקה ):
שִׁיטָה | גוֹדֶל | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 300 | 6.252 אלפיות השנייה | 0.0211 אלפיות השנייה | 0.0187 אלפיות השנייה | 0.23 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 600 | 35.825 אלפיות השנייה | 0.1003 אלפיות השנייה | 0.0837 אלפיות השנייה | 0.16 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
SpartialParallelOptimisations | 1200 | 248.801 אלפיות השנייה | 0.6040 אלפיות השנייה | 0.5043 אלפיות השנייה | 0.14 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 2400 | 2,076.403 אלפיות השנייה | 20.8320 אלפיות השנייה | 19.4863 אלפיות השנייה | 0.14 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
SpartialParallelOptimisations | 4800 | 15,614.506 אלפיות השנייה | 115.6996 אלפיות השנייה | 102.5647 אלפיות השנייה | 0.13 |
מהתוצאות אנו יכולים לראות שהמקבילות של for of i
loop הפחיתה את זמן החישוב פי 2-4 בהשוואה ליישום הקודם ( SpartialOptimisation
)! זה מאוד מרשים, עם זאת, חשוב לזכור, מקבילה של חישובים טהורים צורכת את כל משאבי המחשוב הזמינים מה שעלול להוביל להרעבת משאבים של יישומים אחרים במערכת.
var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];
בצורה פשוטה יתרה , ניתן להמחיש את הביצוע של איטרציה 0
של for
לעיל ברמת CPU באופן הבא:
הנה מה שקורה. CPU טוען ערכים של מערכים a
ו- b
מהזיכרון לתוך אוגרי CPU (אוגר אחד יכול להכיל בדיוק ערך אחד ). לאחר מכן ה-CPU מבצע פעולת הוספה סקלרית ברישמים אלה וכותב את התוצאה בחזרה לזיכרון הראשי - ישר לתוך מערך c
. תהליך זה חוזר על עצמו ארבע פעמים לפני שהלולאה מסתיימת.
הנה מה שקורה. CPU טוען ערכים של מערכים a
ו- b
מהזיכרון לתוך אוגרי CPU (עם זאת, הפעם, אוגר וקטור אחד יכול להחזיק שני ערכי מערך). לאחר מכן, ה-CPU מבצע פעולת הוספה וקטורית ברישוםים אלה וכותב את התוצאה בחזרה לזיכרון הראשי - ישר לתוך מערך c
. מכיוון שאנו פועלים על שני ערכים בבת אחת, התהליך הזה חוזר על עצמו פעמיים במקום ארבע.
public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
קוד מוקטורי יכול להיראות קצת מוזר, אז בואו נעבור עליו צעד אחר צעד.
אנו יוצרים וקטור של i ⭢ k
נתיב: var ik_vec = new Vector<int>(matrix[i * sz + k])
. כתוצאה מכך, אם וקטור יכול להחזיק ארבעה ערכים של סוג int
ומשקל של נתיב i ⭢ k
שווה ל-5, ניצור וקטור של ארבעה 5 - [5, 5, 5, 5]. לאחר מכן, בכל איטרציה, אנו מחשבים בו זמנית נתיבים מקודקוד i
לקודקודים j, j + 1, ..., j + Vector<int>.Count
.
מאפיין Vector<int>.Count
מחזיר מספר אלמנטים מסוג int
שמתאים לרגיסטרים וקטורים. הגודל של אוגרים וקטוריים תלוי במודל המעבד, כך שמאפיין זה יכול להחזיר ערכים שונים במעבדים שונים.
- הערת המחבר
ij_vec
ו- ikj_vec
.ij_vec
ו- ikj_vec
ובחר את הערכים הקטנים ביותר לתוך וקטור חדש r_vec
.r_vec
בחזרה למטריצת המשקל.
בעוד שמספר 1 ו -3 הם די פשוטים, מס' 2 דורש הסבר. כפי שהוזכר קודם לכן, עם וקטורים אנו מבצעים מניפולציות על מספר ערכים בו זמנית. לכן, תוצאה של פעולות מסוימות לא יכולה להיות יחידה, כלומר פעולת השוואה Vector.LessThan(ij_vec, ikj_vec)
לא יכולה להחזיר true
או false
מכיוון שהיא משווה ערכים מרובים. אז במקום זה הוא מחזיר וקטור חדש שמכיל תוצאות השוואה בין ערכים תואמים מהוקטורים ij_vec
ו- ikj_vec
( -1
, אם הערך מ- ij_vec
קטן מהערך מ- ikj_vec
ו 0
אם אחרת). וקטור שהוחזר (כשלעצמו) לא ממש שימושי, עם זאת, אנו יכולים להשתמש בפעולת הווקטור Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec)
כדי לחלץ את הערכים הנדרשים מהוקטורים ij_vec
ו- ikj_vec
לתוך וקטור חדש – r_vec
. פעולה זו מחזירה וקטור חדש שבו הערכים שווים לקטן משני ערכים תואמים של וקטורי קלט, כלומר, אם הערך של וקטור lt_vec
שווה ל -1
, אז הפעולה בוחרת ערך מ- ij_vec
, אחרת היא בוחרת ערך מ- ikj_vec
.
חוץ מס' 2 , יש עוד חלק אחד, דורש הסבר - הלולאה השנייה, הלא מוקטורית. זה נדרש כאשר גודל מטריצת המשקל אינו מכפלה של ערך Vector<int>.Count
. במקרה כזה הלולאה הראשית לא יכולה לעבד את כל הערכים (מכיוון שלא ניתן לטעון חלק מהווקטור) והלולאה השנייה, ללא וקטור, תחשב את האיטרציות הנותרות.
שִׁיטָה | sz | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 300 | 6.252 אלפיות השנייה | 0.0211 אלפיות השנייה | 0.0187 אלפיות השנייה | 0.23 |
SpartialVectorOptimisations | 300 | 3.056 אלפיות השנייה | 0.0301 אלפיות השנייה | 0.0281 אלפיות השנייה | 0.11 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 600 | 35.825 אלפיות השנייה | 0.1003 אלפיות השנייה | 0.0837 אלפיות השנייה | 0.16 |
SpartialVectorOptimisations | 600 | 24.378 אלפיות השנייה | 0.4320 אלפיות השנייה | 0.4041 אלפיות השנייה | 0.11 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
SpartialParallelOptimisations | 1200 | 248.801 אלפיות השנייה | 0.6040 אלפיות השנייה | 0.5043 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 1200 | 185.628 אלפיות השנייה | 2.1240 אלפיות השנייה | 1.9868 אלפיות השנייה | 0.11 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 MS | 0.45 |
SpartialParallelOptimisations | 2400 | 2,076.403 אלפיות השנייה | 20.8320 אלפיות השנייה | 19.4863 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 2400 | 2,568.676 אלפיות השנייה | 31.7359 אלפיות השנייה | 29.6858 אלפיות השנייה | 0.18 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
SpartialParallelOptimisations | 4800 | 15,614.506 אלפיות השנייה | 115.6996 אלפיות השנייה | 102.5647 אלפיות השנייה | 0.13 |
SpartialVectorOptimisations | 4800 | 18,257.991 אלפיות השנייה | 84.5978 אלפיות השנייה | 79.1329 אלפיות השנייה | 0.15 |
מהתוצאה ברור, הוקטוריזציה הפחיתה משמעותית את זמן החישוב - מפי 3 ל-4 בהשוואה לגרסה שאינה מקבילה ( SpartialOptimisation
). הרגע המעניין כאן הוא שהגרסה הווקטוריזית גם מתעלה על הגרסה המקבילה ( SpartialParallelOptimisations
) בגרפים קטנים יותר (פחות מ-2400 קודקודים).
אם אתה מעוניין ביישום מעשי של וקטוריזציה, אני ממליץ לך לקרוא סדרת פוסטים של שבה הוא ערך וקטוריזציה של Array.Sort
(תוצאות אלו מצאו את עצמן מאוחר יותר בעדכון עבור Garbage Collector ב- ).
היישום האחרון משלב מאמצים של הקבלה ווקטוריזציה ו... למען האמת הוא חסר אינדיבידואליות 🙂 כי... ובכן, זה עתה החלפנו גוף של Parallel.For
מ- SpartialParallelOptimisations
עם לולאה וקטורית מ- SpartialVectorOptimisations
:
public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
כל התוצאות של פוסט זה מוצגות למטה. כצפוי, שימוש בו-זמני בהקבלה ובווקטוריזציה הוכיח את התוצאות הטובות ביותר, והפחית את זמן החישוב עד פי 25 (עבור גרפים של 1200 קודקודים) בהשוואה ליישום הראשוני.
שִׁיטָה | sz | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 300 | 6.252 אלפיות השנייה | 0.0211 אלפיות השנייה | 0.0187 אלפיות השנייה | 0.23 |
SpartialVectorOptimisations | 300 | 3.056 אלפיות השנייה | 0.0301 אלפיות השנייה | 0.0281 אלפיות השנייה | 0.11 |
SpartialParallelVectorOptimisations | 300 | 3.008 אלפיות השנייה | 0.0075 אלפיות השנייה | 0.0066 אלפיות השנייה | 0.11 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 600 | 35.825 אלפיות השנייה | 0.1003 אלפיות השנייה | 0.0837 אלפיות השנייה | 0.16 |
SpartialVectorOptimisations | 600 | 24.378 אלפיות השנייה | 0.4320 אלפיות השנייה | 0.4041 אלפיות השנייה | 0.11 |
SpartialParallelVectorOptimisations | 600 | 13.425 אלפיות השנייה | 0.0319 אלפיות השנייה | 0.0283 אלפיות השנייה | 0.06 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
SpartialParallelOptimisations | 1200 | 248.801 אלפיות השנייה | 0.6040 אלפיות השנייה | 0.5043 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 1200 | 185.628 אלפיות השנייה | 2.1240 אלפיות השנייה | 1.9868 אלפיות השנייה | 0.11 |
SpartialParallelVectorOptimisations | 1200 | 76.770 אלפיות השנייה | 0.3021 אלפיות השנייה | 0.2522 אלפיות השנייה | 0.04 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 2400 | 2,076.403 אלפיות השנייה | 20.8320 אלפיות השנייה | 19.4863 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 2400 | 2,568.676 אלפיות השנייה | 31.7359 אלפיות השנייה | 29.6858 אלפיות השנייה | 0.18 |
SpartialParallelVectorOptimisations | 2400 | 1,281.877 אלפיות השנייה | 25.1127 אלפיות השנייה | 64.8239 אלפיות השנייה | 0.09 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
SpartialParallelOptimisations | 4800 | 15,614.506 אלפיות השנייה | 115.6996 אלפיות השנייה | 102.5647 אלפיות השנייה | 0.13 |
SpartialVectorOptimisations | 4800 | 18,257.991 אלפיות השנייה | 84.5978 אלפיות השנייה | 79.1329 אלפיות השנייה | 0.15 |
SpartialParallelVectorOptimisations | 4800 | 12,785.824 אלפיות השנייה | 98.6947 אלפיות השנייה | 87.4903 אלפיות השנייה | 0.11 |