Java CountDownLatch
CountDownLatch Nədir?
CountDownLatch bir və ya bir neçə thread-in digər thread-lərdə müəyyən əməliyyatlar bitənə qədər gözləməsini təmin edən synchronization vasitəsidir. Counter 0-a çatanda gözləyən thread-lər davam edir.
Xüsusiyyətlər:
- Bir dəfəlik istifadə (reusable deyil)
- Count aşağı düşür (count down)
- 0-a çatanda gözləyən thread-lər azad olur
Basic Usage
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 3 event gözləyirik
// 3 worker thread
for (int i = 1; i <= 3; i++) {
final int workerId = i;
new Thread(() -> {
System.out.println("Worker " + workerId + " işə başladı");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Worker " + workerId + " bitdi");
latch.countDown(); // Counter azalt
}).start();
}
System.out.println("Main thread gözləyir...");
latch.await(); // Bütün worker-lər bitənə qədər gözlə
System.out.println("Bütün işlər bitdi!");
}
}
Service Başlanğıcı Gözləmək
class Application {
private final CountDownLatch latch = new CountDownLatch(3);
public void startServices() {
// Database service
new Thread(() -> {
System.out.println("Database başlayır...");
sleep(2000);
System.out.println("Database hazırdır");
latch.countDown();
}).start();
// Cache service
new Thread(() -> {
System.out.println("Cache başlayır...");
sleep(1500);
System.out.println("Cache hazırdır");
latch.countDown();
}).start();
// Message queue service
new Thread(() -> {
System.out.println("Message Queue başlayır...");
sleep(2500);
System.out.println("Message Queue hazırdır");
latch.countDown();
}).start();
}
public void waitForStartup() throws InterruptedException {
latch.await();
System.out.println("Bütün servislər hazırdır. Application başlayır!");
}
private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// İstifadə
Application app = new Application();
app.startServices();
app.waitForStartup();
await() with Timeout
class TimeoutExample {
public void processWithTimeout() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
// 5 task başlat
for (int i = 1; i <= 5; i++) {
final int taskId = i;
new Thread(() -> {
try {
Thread.sleep(taskId * 1000);
System.out.println("Task " + taskId + " bitdi");
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// Maksimum 3 saniyə gözlə
boolean completed = latch.await(3, TimeUnit.SECONDS);
if (completed) {
System.out.println("Bütün task-lar bitdi");
} else {
System.out.println("Timeout! Bəzi task-lar hələ işləyir");
System.out.println("Qalan task sayı: " + latch.getCount());
}
}
}
Paralel Task-ların Koordinasiyası
class ParallelProcessor {
public void processFiles(List<String> files) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(files.size());
List<String> results = new CopyOnWriteArrayList<>();
for (String file : files) {
new Thread(() -> {
try {
// File-ı emal et
String result = processFile(file);
results.add(result);
System.out.println("Processed: " + file);
} finally {
latch.countDown(); // Həmişə countDown edin
}
}).start();
}
// Bütün file-ların emalını gözlə
latch.await();
System.out.println("Nəticələr: " + results);
}
private String processFile(String file) {
// File processing logic
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result for " + file;
}
}
Start Signal Pattern
class RaceExample {
public void startRace() throws InterruptedException {
int runners = 5;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(runners);
// Qaçışçılar hazırlanır
for (int i = 1; i <= runners; i++) {
final int runnerId = i;
new Thread(() -> {
System.out.println("Qaçışçı " + runnerId + " hazırdır");
try {
startSignal.await(); // Start signalını gözlə
System.out.println("Qaçışçı " + runnerId + " qaçır!");
Thread.sleep((int)(Math.random() * 3000));
System.out.println("Qaçışçı " + runnerId + " finişə çatdı");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneSignal.countDown();
}
}).start();
}
Thread.sleep(2000);
System.out.println("START!");
startSignal.countDown(); // Bütün qaçışçılar eyni anda başlayır
doneSignal.await(); // Hamısının bitməsini gözlə
System.out.println("Yarış bitdi!");
}
}
ExecutorService ilə İstifadə
class ExecutorWithLatch {
public void processTasks() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<String> tasks = Arrays.asList("Task1", "Task2", "Task3", "Task4", "Task5");
CountDownLatch latch = new CountDownLatch(tasks.size());
for (String task : tasks) {
executor.submit(() -> {
try {
System.out.println("Processing: " + task);
Thread.sleep(2000);
System.out.println("Completed: " + task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
latch.await(); // Bütün task-lar bitənə qədər gözlə
executor.shutdown();
System.out.println("Bütün task-lar tamamlandı");
}
}
Multiple CountDownLatch
class Pipeline {
public void runPipeline() throws InterruptedException {
CountDownLatch stage1 = new CountDownLatch(2);
CountDownLatch stage2 = new CountDownLatch(2);
CountDownLatch stage3 = new CountDownLatch(1);
// Stage 1 - Data yükləmə
startWorkers(2, "Stage1-Worker", stage1, null);
stage1.await();
System.out.println("Stage 1 tamamlandı");
// Stage 2 - Data emalı
startWorkers(2, "Stage2-Worker", stage2, null);
stage2.await();
System.out.println("Stage 2 tamamlandı");
// Stage 3 - Nəticənin yazılması
startWorkers(1, "Stage3-Worker", stage3, null);
stage3.await();
System.out.println("Stage 3 tamamlandı");
System.out.println("Pipeline bitdi!");
}
private void startWorkers(int count, String prefix,
CountDownLatch latch,
CountDownLatch waitFor) {
for (int i = 1; i <= count; i++) {
final int workerId = i;
new Thread(() -> {
try {
if (waitFor != null) {
waitFor.await();
}
System.out.println(prefix + "-" + workerId + " işləyir");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
}
}
Best Practices
-
Həmişə finally-də countDown() çağırın:
try {
// work
} finally {
latch.countDown(); // Mütləq
} -
Timeout istifadə edin:
if (!latch.await(10, TimeUnit.SECONDS)) {
// Timeout handling
} -
Count-u düzgün hesablayın:
int taskCount = tasks.size();
CountDownLatch latch = new CountDownLatch(taskCount); -
Reusable deyil:
// Hər dəfə yeni latch yaradın
CountDownLatch latch1 = new CountDownLatch(3);
// ... istifadə et
// Yeni istifadə üçün
CountDownLatch latch2 = new CountDownLatch(3);
Common Use Cases
- Application startup - Servislər başlayana qədər gözləmək
- Paralel processing - Bütün task-lar bitənə qədər gözləmək
- Testing - Testdə thread-lərin koordinasiyası
- Race start - Bütün thread-ləri eyni anda başlatmaq
- Pipeline - Mərhələli proseslərin koordinasiyası
CountDownLatch vs CyclicBarrier
| CountDownLatch | CyclicBarrier |
|---|---|
| Bir dəfəlik istifadə | Reusable |
| countDown() və await() | await() only |
| Count aşağı düşür | Fixed count |
| Thread-lər count down etməyə bilər | Hər thread await() çağırmalıdır |