Java CyclicBarrier
CyclicBarrier Nədir?
CyclicBarrier müəyyən sayda thread-in bir nöqtədə toplanmasını və hamısı gələnə qədər gözləməsini təmin edən synchronization vasitəsidir. Bütün thread-lər barrier-ə çatdıqda, hamısı eyni anda davam edir.
Xüsusiyyətlər:
- Reusable - Təkrar istifadə edilə bilər
- Bütün thread-lər await() çağırmalıdır
- Barrier açıldıqdan sonra yenidən istifadə edilir
- Barrier action - Hamısı çatdıqda icra olunan əlavə əməliyyat
Basic Usage
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
for (int i = 1; i <= threadCount; i++) {
final int workerId = i;
new Thread(() -> {
try {
System.out.println("Worker " + workerId + " hazırlanır");
Thread.sleep(workerId * 1000);
System.out.println("Worker " + workerId + " barrier-də gözləyir");
barrier.await(); // Hamısı gələnə qədər gözlə
System.out.println("Worker " + workerId + " davam edir");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Barrier Action
class BarrierActionExample {
public void runWithBarrierAction() {
int parties = 3;
// Hamısı çatdıqda bu action icra olunur
Runnable barrierAction = () -> {
System.out.println("=== Bütün thread-lər barrier-ə çatdı ===");
};
CyclicBarrier barrier = new CyclicBarrier(parties, barrierAction);
for (int i = 1; i <= parties; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " işləyir");
Thread.sleep(id * 1000);
System.out.println("Thread " + id + " barrier-də");
barrier.await();
System.out.println("Thread " + id + " davam edir");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Reusable Barrier
class ReusableBarrierExample {
public void multiplePhases() throws Exception {
int workers = 3;
CyclicBarrier barrier = new CyclicBarrier(workers);
for (int i = 1; i <= workers; i++) {
final int workerId = i;
new Thread(() -> {
try {
// Phase 1
System.out.println("Worker " + workerId + " - Phase 1");
barrier.await(); // 1-ci barrier
// Phase 2
System.out.println("Worker " + workerId + " - Phase 2");
barrier.await(); // 2-ci barrier (reuse)
// Phase 3
System.out.println("Worker " + workerId + " - Phase 3");
barrier.await(); // 3-cü barrier (reuse)
System.out.println("Worker " + workerId + " - Bitdi");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Matrix Hesablama
class MatrixCalculation {
private final int workers = 4;
private final CyclicBarrier barrier;
private final double[][] matrix;
public MatrixCalculation(int size) {
this.matrix = new double[size][size];
// Hər iterasiyadan sonra barrier
this.barrier = new CyclicBarrier(workers, () -> {
System.out.println("İterasiya tamamlandı");
});
}
public void calculate() {
for (int i = 0; i < workers; i++) {
final int workerId = i;
new Thread(() -> {
try {
for (int iteration = 0; iteration < 10; iteration++) {
// Hər worker öz hissəsini hesablayır
calculatePartition(workerId);
// Hamısı bitənə qədər gözlə
barrier.await();
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
private void calculatePartition(int workerId) {
int partitionSize = matrix.length / workers;
int start = workerId * partitionSize;
int end = (workerId + 1) * partitionSize;
for (int i = start; i < end; i++) {
for (int j = 0; j < matrix[i].length; j++) {
matrix[i][j] = Math.random();
}
}
}
}
Paralel Search
class ParallelSearch {
private final List<Integer> data;
private final int target;
private final int workers = 4;
private volatile boolean found = false;
private final CyclicBarrier barrier;
public ParallelSearch(List<Integer> data, int target) {
this.data = data;
this.target = target;
this.barrier = new CyclicBarrier(workers, () -> {
if (found) {
System.out.println("Tapıldı!");
} else {
System.out.println("Bu mərhələdə tapılmadı");
}
});
}
public void search() {
int partitionSize = data.size() / workers;
for (int i = 0; i < workers; i++) {
final int workerId = i;
final int start = workerId * partitionSize;
final int end = (workerId == workers - 1) ?
data.size() : (workerId + 1) * partitionSize;
new Thread(() -> {
try {
for (int j = start; j < end; j++) {
if (data.get(j) == target) {
found = true;
System.out.println("Worker " + workerId +
" tapıldı index: " + j);
break;
}
}
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
await() with Timeout
class TimeoutBarrierExample {
public void processWithTimeout() {
CyclicBarrier barrier = new CyclicBarrier(3);
// 2 thread barrier-ə çatır
for (int i = 1; i <= 2; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +
" barrier-də gözləyir");
barrier.await(5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() +
" davam edir");
} catch (TimeoutException e) {
System.out.println(Thread.currentThread().getName() +
" timeout!");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
// 3-cü thread gəlmir - timeout olacaq
}
}
Barrier Reset
class BarrierResetExample {
public void demonstrateReset() {
CyclicBarrier barrier = new CyclicBarrier(3);
// 2 thread başlayır
for (int i = 1; i <= 2; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +
" gözləyir");
barrier.await();
} catch (BrokenBarrierException e) {
System.out.println(Thread.currentThread().getName() +
" - Barrier broken!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
try {
Thread.sleep(1000);
System.out.println("Barrier reset edilir");
barrier.reset(); // Gözləyən thread-lər BrokenBarrierException alır
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Game Simulation
class GameSimulation {
private final int players = 4;
private final CyclicBarrier turnBarrier;
private int currentRound = 0;
public GameSimulation() {
this.turnBarrier = new CyclicBarrier(players, () -> {
currentRound++;
System.out.println("=== Round " + currentRound + " başladı ===");
});
}
public void startGame() {
for (int i = 1; i <= players; i++) {
final int playerId = i;
new Thread(() -> {
try {
for (int round = 0; round < 5; round++) {
// Oyunçu move edir
makeMove(playerId, round);
// Bütün oyunçular move edənə qədər gözlə
turnBarrier.await();
}
System.out.println("Player " + playerId + " oyunu bitirdi");
} catch (Exception e) {
e.printStackTrace();
}
}, "Player-" + i).start();
}
}
private void makeMove(int playerId, int round) {
try {
Thread.sleep((int)(Math.random() * 1000));
System.out.println("Player " + playerId +
" round " + round + " move etdi");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Best Practices
-
Thread sayını düzgün təyin edin:
int parties = 3; // Dəqiq 3 thread await() çağırmalıdır
CyclicBarrier barrier = new CyclicBarrier(parties); -
Barrier action istifadə edin:
Runnable action = () -> System.out.println("Phase completed");
CyclicBarrier barrier = new CyclicBarrier(3, action); -
Timeout istifadə edin:
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Handle timeout
} -
BrokenBarrierException-ı handle edin:
try {
barrier.await();
} catch (BrokenBarrierException e) {
// Barrier broken, yenidən yaradın
}
Common Use Cases
- Paralel alqoritmlər - Matrix hesablamaları, parallel processing
- Simulation - Oyun simulations, multi-agent systems
- Testing - Thread-lərin eyni anda başlaması
- Iterative algorithms - Hər iterasiyada sync lazım olan alqoritmlər
- Phased execution - Mərhələli icra
CyclicBarrier vs CountDownLatch
| CyclicBarrier | CountDownLatch |
|---|---|
| Reusable | Bir dəfəlik |
| Hər thread await() çağırır | countDown() və await() ayrı |
| Fixed parties | Count aşağı düşür |
| Barrier action mövcuddur | Action yoxdur |
| BrokenBarrierException | Yoxdur |
| reset() metodu var | reset() yoxdur |
Nə Zaman CyclicBarrier?
CyclicBarrier istifadə edin:
- Təkrar istifadə lazımdırsa
- Fixed sayda thread eyni nöqtədə görüşməlidir
- Hər mərhələdə synchronization lazımdır
- Barrier action lazımdır
CountDownLatch istifadə edin:
- Bir dəfəlik istifadə
- Thread-lər count down edə bilər
- Main thread digər thread-ləri gözləyir