W systemie działa pewna liczba procesów zajmujących się przetwarzaniem zadań. Każde zadanie jest wykonywane przez K procesów (K > 1). Każdy proces (w nieskończonej pętli) zgłasza się do pracy, otrzymuje numer kolejny z przedziału od 1 do K, a następnie czeka na zgłoszenie się wszystkich K procesów. Ostatni (K-ty) proces pobiera zadanie do przetwarzanie (function pobierz_zadanie(nr_zadania: integer) : DANE). Pobrane zadanie jest następnie przetwarzane sekwencyjnie przez wszystkie procesy z tej grupy, począwszy od pierwszego procesu (tj. procesu, który przy zgłoszeniu otrzymał numer 1) aż do ostatniego (procedure przetwarzaj(var dane: DANE; nr : 1..K). Po zakończeniu przetwarzania każdy proces ponownie zgłasza się do pracy. Zakończenie przetwarzania zadania następuje po zakończeniu przetwarzania tego zadania przez wszystkie procesy z grupy. Po zakończeniu przetwarzania przez grupę wysyłany jest rezultat przetwarzania (procedure wyslij_rezultat(nr_zadania: integer, dane: DANE)).
Równocześnie może być wykonywanych co najwyżej MAX zadań (opisanych wyżej; MAX>=1). Przetwarzanie różnych zadań może (i powinno) odbywać się równolegle przy czym przetwarzanie danego zadania przez i-ty proces z danej grupy może rozpocząć się dopiero po zakończeniu przetwarzania poprzedniego zadania przez i-ty proces z poprzedniej grupy. Zapisz przy użyciu semaforów treść procesów działających w tym systemie.
var: przetwarzane_dane: array [1..MAX] of DANE; process Proces() var: nr_etap: integer; nr_zadania: integer; index_synchronizacji_zadania: integer; begin while (true) do begin ... nr_praca := ... index_synchronizacji_zadania := ... {(1..MAX)} nr_etap := ... {(1..K)} ... if (nr_etap=K) then begin przetwarzane_dane[index_synchronizacji_zadania] = pobierz_zadanie(nr_praca); end; ... przetwarzaj(przetwarzane_dane[index_synchronizacji_zadania], nr_etap); ... if (...grupa przetworzyla zadanie...) then begin wyslij_rezultat(nr_praca, przetwarzane_dane[index_synchronizacji_zadania]); end; ... end; end;
Z treści zadania wynika, że etap I zadania N+1 zaczyna się po zakończeniu etapu I zadania N. Z tego wynika, że jeśli zaczynamy przetwarzać etap K zadania N, to wszystkie K etapy wszystkich wcześniejszych prac zostały zakończyły. Ponieważ równocześnie może się wykonywać MAX zadań, dlatego gdy rozpoczyna się zadanie I to mamy pewność, że pewne zadanie o numerze I-MAX...I-1 się zakończyło. Niech to będzie zadanie L (I-MAX...I-1). Wtedy dla wszystkich zadań Z<=L wszystkie K etapy zostały zakończone.
Z przedstawionego rozumowania wynika, że jeśli rozpoczyna się przetwarzanie zadania N to wszystkie K etapy przetwarzania zadania N-MAX zostały zakończone. Dlatego grupa przetwarzająca zadanie N może używać struktur danych używanych przy przetwarzaniu wszystkich K etapów zadania N-MAX. Z tego powodu w rozwiązaniu możemy używać struktur danych wielkości MAX (grupa przetwarzająca zadanie I-te wykorzystuje struktury wykorzystywane przez grupę która przetwarzała zadanie I-MAX).
var: ochrona: binary semaphore := 1; czekajacy_pracownicy: binary semaphore := 0; liczba_czekajacych_pracownikow: integer := 0; liczba_przydzielonych_zadan: integer := 0; liczba_zadan_wykonywanych: integer := 0; liczba_pracownikow_przydzielonych: integer := 0; kolejny_etap: array of [1..MAX,1..K] binary semaphore := {MAX*K times 0}; kolejne_zadanie_na_etapie: array [1..MAX,1..K] binary semaphore := {K times 1, MAX*(K-1) times 0}; { kolejne_zadanie_na_etapie[1,*]=1; others 0; } przetwarzane_dane: array[1..MAX] of DANE; process Proces() var: nr_etap: integer; nr_zadania: integer; index_synchronizacji_zadania: integer; begin while (true) do begin P(ochrona); if (liczba_zadan_wykonywanych=MAX) then begin liczba_czekajacych_pracownikow = liczba_czekajacych_pracownikow + 1; V(ochrona); P(czekajacy_pracownicy); liczba_czekajacych_pracownikow = liczba_czekajacych_pracownikow - 1; end; nr_etap := liczba_pracownikow_przydzielonych + 1; liczba_pracownikow_przydzielonych = liczba_pracownikow_przydzielonych + 1; nr_zadania := liczba_przydzielonych_zadan + 1; index_synchronizacji_zadania := liczba_przydzielonych_zadan mod MAX + 1; if (nr_etap=K) then begin liczba_pracownikow_przydzielonych := 0; liczba_przydzielonych_zadan := liczba_przydzielonych_zadan + 1; liczba_zadan_wykonywanych = liczba_zadan_wykonywanych + 1; przetwarzane_dane[index_synchronizacji_zadania] = pobierz_zadanie(nr_zadania); V(ochrona); V(kolejny_etap[index_synchronizacji_zadania, 1]); end; else begin if (liczba_czekajacych_pracownikow>0) then V(czekajacy_pracownicy); else V(ochrona); end; P(kolejny_etap[index_synchronizacji_zadania, nr_etap]); P(kolejne_zadanie_na_etapie[index_synchronizacji_zadania, nr_etap]); przetwarzaj(przetwarzane_dane[index_synchronizacji_zadania], nr_etap); if (nr_etap<K) then begin V(kolejne_zadanie_na_etapie[index_synchronizacji_zadania mod MAX + 1, nr_etap]); V(kolejny_etap[index_synchronizacji_zadania, nr_etap+1]); end; else begin wyslij_rezultat(nr_zadania, przetwarzane_dane[index_synchronizacji_zadania]); V(kolejne_zadanie_na_etapie[index_synchronizacji_zadania mod MAX + 1, nr_etap]); P(ochrona); liczba_zadan_wykonywanych = liczba_zadan_wykonywanych - 1; if (liczba_czekajacych_pracownikow>0) then V(czekajacy_pracownicy); else V(ochrona); end; end; end;
Istotne jest zachowanie ostatniego procesu przetwarzającego dane zadanie. Niech będzie to zadanie numer N. Proces ten pozwala wykonać etap (ostatni) następnego zadania (N+1), dopiero po wysłaniu rezultatu przetwarzania. W przeciwnym przypadku, mogłoby dojść do nadpisania wysyłanych danych przez nową grupę. Gdy ostatni proces zadania N działa w odwrotnej kolejności (najpierw pozwala wykonywać etap (ostatni) zadania N+1 a potem wysyła wynik) to możliwe jest, że po tym pozwoleniu a przed wysłaniem rezultatu, grupa N+1 zakończy przetwarzanie i pozwoli się wykonywać kolejnej grupie (a pośrednio także kolejnym grupom). Jedna z tych przedwcześnie wpuszczonych grup może nadpisać dane które zamierza wysłać proces (ostatni) grupy N.
Przy głębszej analizie, można zauważyć, że przedstawione rozwiązanie nie jest optymalne. Zobaczmy, zachowanie gdy grupa przetwarzająca pewne zadanie N przetworzyła już wszystkie K etapów, a w tym momencie proces K (ostatni) wysyłała rezultat. Załóżmy, że wysyłanie akurat rezultatu tego zadania trwa bardzo długo. W obecnym rozwiązaniu żadne z zadań o numerach Z>=N nie może zostać zakończone, tym samym żadne z zadań o numerach Z>=N+MAX nie może zostać rozpoczęte. Proces wysyłający rezultat zadania N używa tylko zasobu przetwarzane_dane[index_danych_zadania], nie potrzebuje już struktur synchronizacyjnych (kolejny_etap, kolejne_zadanie_na_etapie). W związku z tym gdy zakończy się pewne zadanie L (N+1..N+MAX-1) to można uruchomić nowe zadanie, które może korzystać ze struktur synchronizacyjnych używanych przez zadanie N. Jeśli chodzi o dane przetwarzane to nowo uruchamiane zadanie powinno używać wolnego (być może tego zwolnionego przez zadanie L) indeksu w tablicy przetwarzane_dane - gdyż rezultat w przetwarzane_dane[index_danych_zadania] jest obecnie wysyłany przez proces (ostatni) zadania N i nie można z tego zasobu korzystać. Z tego wynika, że dane synchronizacyjne powinny zostać oddzielone od danych przetwarzanych. Dane synchronizacyjne muszą być przetwarzane kolejno (kolejne zadania korzystają z kolejnych indeksów) natomiast dane przetwarzane nie muszą korzystać z kolejnych indeksów.
var: ochrona: binary semaphore := 1; czekajacy_pracownicy: binary semaphore := 0; liczba_czekajacych_pracownikow: integer := 0; liczba_przydzielonych_zadan: integer := 0; liczba_zadan_wykonywanych: integer := 0; liczba_pracownikow_przydzielonych: integer := 0; kolejny_etap: array of [1..MAX,1..K] binary semaphore := {MAX*K times 0}; kolejne_zadanie_na_etapie: array [1..MAX,1..K] binary semaphore := {K times 1, MAX*(K-1) times 0}; { kolejne_zadanie_na_etapie[1,*]=1; others 0; } przetwarzane_dane: array[1..MAX] of DANE; przetwarzane_dane_ochrona: binary semaphore := 0; {ochrona tablicy przetwarzane_dane} przetwarzane_dane_status: array [1..MAX] of (wolne,zajete) := [MAX times wolne]; {czy odpowiednia komorka w tablicy przetwarzane_dane jest aktyalnie uzywana} index_danych_nowego_zadania: integer; {indeks w tablicy dane z ktorego korzystaja procesy przetwarzajace aktualnie przydzielane zadanie} function zarezerwuj_miejsce_na_dane(): integer var i: integer; index: integer := 0; begin index := 0; P(przetwarzane_dane_ochrona); for i:=1 to MAX do begin if (przetwarzane_dane_status[i]=wolne) then begin index := i; przetwarzane_dane_status[index] := zajete; end; end; V(przetwarzane_dane_ochrona); return index; end; procedure zwolnij_miejsce_na_dane(int index) begin P(przetwarzane_dane_ochrona); przetwarzane_dane_status[i] := wolne; V(przetwarzane_dane_ochrona); end; process Proces() var: nr_etap: integer; nr_zadania: integer; index_synchronizacji_zadania: integer; index_danych_zadania: integer; {indeks w tablicy dane} begin while (true) do begin P(ochrona); if (liczba_zadan_wykonywanych=MAX) then begin liczba_czekajacych_pracownikow = liczba_czekajacych_pracownikow + 1; V(ochrona); P(czekajacy_pracownicy); liczba_czekajacych_pracownikow = liczba_czekajacych_pracownikow - 1; end; nr_etap := liczba_pracownikow_przydzielonych + 1; liczba_pracownikow_przydzielonych = liczba_pracownikow_przydzielonych + 1; nr_zadania := liczba_przydzielonych_zadan + 1; index_synchronizacji_zadania := liczba_przydzielonych_zadan mod MAX + 1; if (nr_etap=1) then begin index_danych_nowego_zadania := zarezerwuj_miejsce_na_dane(); {rezerwacja miejsca na dane - pewne miejsce jest wolne, gdyz proces ma zagwarantowane ze razem z jego zadaniem jest przetwarzanych nie wiecej niz MAX zadan} end; index_danych_zadania := index_danych_nowego_zadania; if (nr_etap=K) then begin liczba_pracownikow_przydzielonych := 0; liczba_przydzielonych_zadan := liczba_przydzielonych_zadan + 1; liczba_zadan_wykonywanych = liczba_zadan_wykonywanych + 1; V(ochrona); przetwarzane_dane[index_danych_zadania] = pobierz_zadanie(nr_zadania); V(kolejny_etap[index_synchronizacji_zadania, 1]); end; else begin if (liczba_czekajacych_pracownikow>0) then V(czekajacy_pracownicy); else V(ochrona); end; P(kolejny_etap[index_synchronizacji_zadania, nr_etap]); P(kolejne_zadanie_na_etapie[index_synchronizacji_zadania, nr_etap]); przetwarzaj(przetwarzane_dane[index_danych_zadania], nr_etap); V(kolejne_zadanie_na_etapie[index_synchronizacji_zadania mod MAX + 1, nr_etap]); if (nr_etap<K) then begin V(kolejny_etap[index_synchronizacji_zadania, nr_etap+1]); end; else begin wyslij_rezultat(nr_zadania, przetwarzane_dane[index_danych_zadania]); zwolnij_miejsce_na_dane(index_danych_zadania); {zwalnianie miejsca na dane} P(ochrona); liczba_zadan_wykonywanych = liczba_zadan_wykonywanych - 1; if (liczba_czekajacych_pracownikow>0) then V(czekajacy_pracownicy); else V(ochrona); end; end; end;
W tym rozwiązaniu przetwarzanie kolejnych zadań nie jest wstrzymywane na skutek długotrwałego wysyłania rezultatu pewnego zadania. Osiągnięte to zostało przez odseparowanie struktur wykorzystywanych do synchronizacji procesów od struktur wykorzystywanych do przetwarzania danych.