Java Fork-Join Framework
Fork-Join Nədir?
Fork-Join Framework (Java 7) divide-and-conquer alqoritmlərini parallel olaraq icra etmək üçün nəzərdə tutulmuşdur. Böyük taskları kiçik sub-task-lara bölür və paralel olaraq həll edir.
Əsas Komponentlər
- ForkJoinPool - Thread pool
- ForkJoinTask - Abstract task class
- RecursiveTask< V > - Result qaytaran task-lar üçün
- RecursiveAction - Result qaytarmayan task-lar üçün
Basit Nümunə - Array Sum
import java.util.concurrent.*;
class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start, end;
private static final int THRESHOLD = 1000;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// Base case - kiçik array-i birbaşa hesabla
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// Divide - böyük array-i iki yerə böl
int middle = start + length / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);
// Fork - parallel icra et
leftTask.fork();
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
// Conquer - nəticələri birləşdir
return leftResult + rightResult;
}
}
// İstifadə
public class ForkJoinExample {
public static void main(String[] args) {
int[] array = new int[10000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task);
System.out.println("Sum: " + result);
pool.shutdown();
}
}
RecursiveAction Nümunəsi
class IncrementTask extends RecursiveAction {
private final int[] array;
private final int start, end;
private static final int THRESHOLD = 1000;
public IncrementTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
int length = end - start;
if (length <= THRESHOLD) {
// Base case - hər elementi artır
for (int i = start; i < end; i++) {
array[i]++;
}
} else {
// Divide and conquer
int middle = start + length / 2;
IncrementTask leftTask = new IncrementTask(array, start, middle);
IncrementTask rightTask = new IncrementTask(array, middle, end);
invokeAll(leftTask, rightTask); // Hər ikisini parallel icra et
}
}
}
Fibonacci Nümunəsi
class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FibonacciTask f1 = new FibonacciTask(n - 1);
FibonacciTask f2 = new FibonacciTask(n - 2);
f1.fork();
int result2 = f2.compute();
int result1 = f1.join();
return result1 + result2;
}
}
// İstifadə
ForkJoinPool pool = new ForkJoinPool();
FibonacciTask task = new FibonacciTask(20);
Integer result = pool.invoke(task);
System.out.println("Fibonacci(20) = " + result);
Merge Sort Nümunəsi
class MergeSortTask extends RecursiveAction {
private final int[] array;
private final int start, end;
private static final int THRESHOLD = 100;
public MergeSortTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// Sequential sort for small arrays
Arrays.sort(array, start, end);
} else {
int middle = (start + end) / 2;
MergeSortTask leftTask = new MergeSortTask(array, start, middle);
MergeSortTask rightTask = new MergeSortTask(array, middle, end);
invokeAll(leftTask, rightTask);
merge(array, start, middle, end);
}
}
private void merge(int[] array, int start, int middle, int end) {
int[] temp = new int[end - start];
int i = start, j = middle, k = 0;
while (i < middle && j < end) {
temp[k++] = array[i] <= array[j] ? array[i++] : array[j++];
}
while (i < middle) temp[k++] = array[i++];
while (j < end) temp[k++] = array[j++];
System.arraycopy(temp, 0, array, start, temp.length);
}
}
Work-Stealing
Fork-Join Framework "work-stealing" alqoritmi istifadə edir:
// Hər thread-in öz deque-si var
// Thread öz task-larını deque-nin sonundan alır
// Boş thread-lər digər thread-lərin deque-sinin əvvəlindən task oğurlayır
ForkJoinPool pool = new ForkJoinPool(); // CPU core sayı qədər thread
// və ya
ForkJoinPool customPool = new ForkJoinPool(8); // 8 thread
ForkJoinPool Configuration
// System-wide common pool
ForkJoinPool commonPool = ForkJoinPool.commonPool();
int parallelism = commonPool.getParallelism();
// Custom pool
ForkJoinPool customPool = new ForkJoinPool(
4, // parallelism
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, // exception handler
true // async mode
);
// Submit task
Future<Integer> future = customPool.submit(new FibonacciTask(30));
Integer result = future.get();
Performance Tips
Threshold Seçimi
class OptimizedSumTask extends RecursiveTask<Long> {
// Threshold çox kiçik olmamalı (overhead)
// Threshold çox böyük olmamalı (parallelism itir)
private static final int THRESHOLD = 10000; // Optimal dəyər
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// Sequential computation
return computeDirectly();
}
// Fork-join logic
return forkAndJoin();
}
}
fork() vs invoke()
// fork() - asynchronous
leftTask.fork();
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
// invoke() - synchronous
Long leftResult = leftTask.invoke();
Long rightResult = rightTask.invoke();
// invokeAll() - multiple tasks
invokeAll(task1, task2, task3);
Stream API ilə Müqayisə
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Sequential stream
long sequentialSum = Arrays.stream(array)
.asLongStream()
.sum();
// Parallel stream (Fork-Join istifadə edir)
long parallelSum = Arrays.stream(array)
.parallel()
.asLongStream()
.sum();
// Custom Fork-Join
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
Long forkJoinSum = pool.invoke(task);
Best Practices
-
Uyğun threshold seçin:
private static final int THRESHOLD = 1000; // Eksperimental olaraq təyin edin -
CPU-intensive task-lar üçün istifadə edin:
// Yaxşı: Mathematical calculations
// Yaxşı: Image processing
// Pis: I/O operations
// Pis: Database queries -
Pool-ları düzgün idarə edin:
ForkJoinPool pool = new ForkJoinPool();
try {
return pool.invoke(task);
} finally {
pool.shutdown();
} -
Common pool istifadə edin:
// System-wide common pool
CompletableFuture.supplyAsync(() -> task.compute());
Common Pool vs. Custom Pool
// Common Pool - tövsiyə olunur
ForkJoinTask<Integer> task = new MyTask();
Integer result = task.fork().join(); // Common pool istifadə edir
// Custom Pool - xüsusi ehtiyaclar üçün
ForkJoinPool customPool = new ForkJoinPool(2);
Integer result = customPool.invoke(task);
customPool.shutdown();
Debugging
public class DebuggableForkJoinTask extends RecursiveTask<Integer> {
@Override
protected Integer compute() {
System.out.println("Task running on: " +
Thread.currentThread().getName());
// Logic here
return result;
}
}
// Pool monitoring
ForkJoinPool pool = new ForkJoinPool();
System.out.println("Parallelism: " + pool.getParallelism());
System.out.println("Active threads: " + pool.getActiveThreadCount());
System.out.println("Running threads: " + pool.getRunningThreadCount());
Üstünlükləri
- Work-stealing - Effektiv load balancing
- CPU utilization - Bütün core-lardan istifadə
- Divide-and-conquer - Təbii parallel alqoritm dəstəyi
- Built-in - JVM-də daxili dəstək
Məhdudiyyətlər
- CPU-bound tasks üçün uyğundur
- I/O operations üçün uyğun deyil
- Memory overhead - Thread yaratma xərci
- Complexity - Düzgün threshold seçimi lazımdır