Task Scheduler: bài toán idle time và khi nào công thức đánh bại heap
Bài toán (LeetCode 621): cho một danh sách CPU task, mỗi task được đánh dấu bằng một chữ cái A–Z, và một giá trị cooldown n. Mỗi khoảng thời gian CPU có thể thực thi một task hoặc idle. Cùng một task không thể chạy lại cho đến khi ít nhất n khoảng thời gian đã trôi qua kể từ lần chạy trước. Tìm số khoảng thời gian tối thiểu để hoàn thành tất cả.
Ví dụ: tasks = ["A","A","A","B","B","B"], n = 2. Một lịch hợp lệ là A -> B -> idle -> A -> B -> idle -> A -> B — tổng cộng 8 khoảng.
Ràng buộc cooldown làm bài không đơn giản. Không có cooldown, đáp án chỉ là len(tasks). Khi có cooldown, idle slot có thể xuất hiện — và câu hỏi là idle có thực sự cần thiết không, nếu có thì bao nhiêu.
Ràng buộc dẫn đến gì
1 <= tasks.length <= 10^4. Tối đa mười nghìn task. Đủ nhỏ để simulation từng bước, nhưng O(m * log k) với m là tổng số khoảng thời gian đầu ra (có thể lớn hơn tasks.length khi có idle slot) là cái giá phải trả cho simulation. Ở cực đoan — một loại task lặp 10.000 lần với n = 100 — tổng số khoảng thời gian lên tới khoảng 1.000.000. Vẫn nhanh, nhưng công thức toán học bỏ qua toàn bộ phần đó.
tasks[i] là chữ cái in hoa. Tối đa 26 loại task khác nhau. Đây là ràng buộc làm mọi thứ khả thi. Heap không bao giờ có quá 26 phần tử. Frequency counting vừa trong một mảng cố định. max_count trong công thức bị giới hạn bởi 26.
0 <= n <= 100. Khi n = 0, không có cooldown và chỉ cần chạy mọi task liên tiếp — đáp án là len(tasks). Khi n = 100, có thể cần 100 idle slot giữa mỗi cặp task giống nhau có tần suất cao. Cả hai cực đoan đều được xử lý gọn bởi cả hai approach.
Approach 1: Heap simulation
Cách trực tiếp mô phỏng đồng hồ CPU từng tick. Tại mỗi tick, muốn chọn task có tần suất còn lại cao nhất và đủ điều kiện chạy — nên max-heap được sắp xếp theo số lần còn lại là cấu trúc dữ liệu tự nhiên nhất. Các task đang trong thời gian cooldown đợi trong một queue; khi cooldown hết, chúng quay lại heap.
Python chỉ có min-heap, nên tôi phủ dấu tất cả count để giả lập max-heap. Cái freq += 1 ở vòng lặp trông hơi lạ — thực ra đang cộng 1 vào một số âm, hiệu quả là giảm count tuyệt đối đi 1.
import heapq
from collections import Counter, deque
from typing import List
class Solution:
def leastInterval(self, tasks: List[str], n: int) -> int:
count = Counter(tasks)
max_heap = [-freq for freq in count.values()]
heapq.heapify(max_heap)
# Mỗi entry trong waiting_queue: (negated_freq, thời_điểm_được_phép_chạy)
waiting_queue = deque()
time = 0
while max_heap or waiting_queue:
time += 1
# Giải phóng task nào vừa hết cooldown
if waiting_queue and waiting_queue[0][1] == time:
freq = waiting_queue.popleft()[0]
heapq.heappush(max_heap, freq)
if max_heap:
freq = heapq.heappop(max_heap)
freq += 1 # giảm count tuyệt đối (freq đang âm)
if freq < 0:
# Còn lần chạy nữa; cho vào cooldown
waiting_queue.append((freq, time + n + 1))
return timeusing System;
using System.Collections.Generic;
using System.Linq;
public class Solution {
public int LeastInterval(char[] tasks, int n) {
var count = new Dictionary<char, int>();
foreach (char task in tasks)
count[task] = count.GetValueOrDefault(task, 0) + 1;
// .NET PriorityQueue là min-heap; dùng priority âm để giả lập max
var maxHeap = new PriorityQueue<int, int>();
foreach (int freq in count.Values)
maxHeap.Enqueue(freq, -freq);
var waitingQueue = new Queue<(int freq, int availableTime)>();
int time = 0;
while (maxHeap.Count > 0 || waitingQueue.Count > 0) {
time++;
if (waitingQueue.Count > 0 && waitingQueue.Peek().availableTime == time) {
var (freq, _) = waitingQueue.Dequeue();
maxHeap.Enqueue(freq, -freq);
}
if (maxHeap.Count > 0) {
int freq = maxHeap.Dequeue() - 1;
if (freq > 0)
waitingQueue.Enqueue((freq, time + n + 1));
}
}
return time;
}
}Trace từng bước ví dụ đầu tiên
tasks = ["A","A","A","B","B","B"], n = 2. Sau khi đếm: A:3, B:3. Heap chứa [-3, -3] (hai entry, cả hai có count 3).
Kết quả: 8. Đúng với đáp án mong đợi.
Các idle slot tại t=3 và t=6 xuất hiện vì tất cả task đang trong cooldown cùng lúc. Đơn giản là không có gì để chạy.
Độ phức tạp
Time: O(m * log k) trong đó m là tổng số khoảng thời gian đầu ra và k là số loại task khác nhau (tối đa 26). Mỗi khoảng thực hiện nhiều nhất một heap push và một heap pop, mỗi cái tốn O(log k). Trong trường hợp xấu nhất, m lớn hơn len(tasks) rất nhiều — ví dụ một loại task duy nhất với n lớn.
Space: O(k) — heap và waiting queue cộng lại giữ tối đa k entry, một cái mỗi loại task.
Approach 2: Công thức toán học
Simulation đúng nhưng làm nhiều việc hơn cần thiết. Thực ra không cần biết task nào chạy ở tick nào — chỉ cần đếm.
Insight đến từ task có tần suất cao nhất. Gọi count của nó là max_freq. Task đó buộc tạo ra một số khoảng cách bắt buộc: giữa mỗi cặp lần chạy liên tiếp, cần ít nhất n khoảng. Với max_freq lần chạy, có max_freq - 1 khoảng cách, mỗi cái có kích thước tối thiểu là n. Cấu trúc tối thiểu trông như sau:
[chunk][gap][chunk][gap]...[chunk][gap][chunk_cuoi]
(n+1) (n+1) (n+1) (max_count)
Mỗi chunk đầy có kích thước n + 1 — một slot cho task có tần suất cao nhất cộng n slot cho các task khác hoặc idle. Có max_freq - 1 chunk đầy như vậy. Chunk cuối chỉ chứa các task có cùng max_freq — gọi count đó là max_count. Không có gì sau lần chạy cuối cần cooldown.
Công thức: (max_freq - 1) * (n + 1) + max_count.
Nhưng công thức này có thể đếm thiếu. Nếu có đủ loại task khác nhau hoặc đủ volume để lấp đầy mọi khoảng và tràn ra nhiều khoảng hơn, đáp án chỉ là len(tasks). Do đó: max((max_freq - 1) * (n + 1) + max_count, len(tasks)).
from collections import Counter
from typing import List
class Solution:
def leastInterval(self, tasks: List[str], n: int) -> int:
count = Counter(tasks)
max_freq = max(count.values())
max_count = sum(1 for freq in count.values() if freq == max_freq)
formula_result = (max_freq - 1) * (n + 1) + max_count
return max(formula_result, len(tasks))using System;
using System.Collections.Generic;
using System.Linq;
public class Solution {
public int LeastInterval(char[] tasks, int n) {
var count = new Dictionary<char, int>();
foreach (char task in tasks)
count[task] = count.GetValueOrDefault(task, 0) + 1;
int maxFreq = count.Values.Max();
int maxCount = count.Values.Count(freq => freq == maxFreq);
int formulaResult = (maxFreq - 1) * (n + 1) + maxCount;
return Math.Max(formulaResult, tasks.Length);
}
}Trace ví dụ 1 với công thức
tasks = ["A","A","A","B","B","B"], n = 2. Count: A:3, B:3. max_freq = 3, max_count = 2.
(3 - 1) * (2 + 1) + 2 = 2 * 3 + 2 = 8
max(8, 6) = 8
Cấu trúc lịch trình:
chunk 1: A B _
chunk 2: A B _
cuoi: A B
Ba cột, 8 slot. Khớp.
Trace ví dụ 2
tasks = ["A","C","A","B","D","B"], n = 1. Count: A:2, B:2, C:1, D:1. max_freq = 2, max_count = 2.
(2 - 1) * (1 + 1) + 2 = 1 * 2 + 2 = 4
max(4, 6) = 6
Ở đây len(tasks) = 6 thắng. Lịch trình là A -> B -> C -> D -> A -> B — không cần idle vì có đủ đa dạng task để lấp đầy các khoảng. Đây là trường hợp công thức chunk không mô tả được cấu trúc, nhưng max(formula, len(tasks)) bắt đúng.
Trace ví dụ 3
tasks = ["A","A","A","B","B","B"], n = 3. Count: A:3, B:3. max_freq = 3, max_count = 2.
(3 - 1) * (3 + 1) + 2 = 2 * 4 + 2 = 10
max(10, 6) = 10
Cấu trúc:
chunk 1: A B _ _
chunk 2: A B _ _
cuoi: A B
10 slot. Hai idle slot mỗi chunk vì chỉ có hai loại task nhưng cooldown là 3.
Độ phức tạp
Time: O(k) với k <= 26 — một lần duyệt qua frequency map. Thực tế là O(1) vì k bị giới hạn. Bước khởi tạo Counter tốn O(N) số task, thứ này chiếm ưu thế.
Space: O(k) cho frequency map.
Các edge case thực sự quan trọng
n = 0, không có cooldown. Cả hai approach đều xử lý được, nhưng qua các đường khác nhau. Công thức cho (max_freq - 1) * 1 + max_count, và vì len(tasks) >= max_freq nên max luôn trả về len(tasks). Trong simulation, waiting queue không bao giờ chặn gì vì time + 0 + 1 = time + 1 — task quay lại heap ngay tick tiếp theo và luôn có thứ để chạy.
Một loại task duy nhất. tasks = ["A","A","A","A"], n = 2. max_freq = 4, max_count = 1. Công thức: 3 * 3 + 1 = 10. Lịch trình là A _ _ A _ _ A _ _ A — toàn idle. Công thức bắt gọn vì cấu trúc chunk rõ ràng.
Tất cả task có tần suất bằng nhau. tasks = ["A","B","C","D"], n = 2. Mọi tần suất đều là 1. max_freq = 1, max_count = 4. Công thức: 0 * 3 + 4 = 4. max(4, 4) = 4. Không cần idle; một lượt qua tất cả task theo bất kỳ thứ tự nào.
Nhiều loại task cùng max_freq. Nếu nhiều loại task có cùng max_freq, chunk cuối lấp đầy bằng tất cả chúng, và công thức tính qua max_count. Simulation xử lý tự nhiên — heap chỉ có nhiều ứng viên ngang hàng và chọn bất kỳ cái nào (không quan trọng cái nào).
Task tần suất cao với n lớn. tasks = ["A"] * 10000, n = 100. Công thức: 9999 * 101 + 1 = 1.009.900 khoảng thời gian. Simulation sẽ lặp gần 1 triệu lần. Đây là nơi công thức O(1) tốt hơn hẳn — không phải gấp 10 lần, mà theo cấp độ khác.
So sánh
| Approach | Time | Space | Ghi chú |
|---|---|---|---|
| Heap simulation | O(m * log k) | O(k) | m = tổng khoảng thời gian; đúng mọi trường hợp; dễ suy luận hơn |
| Công thức toán học | O(N) | O(k) | N = len(tasks); nhanh hơn đáng kể khi idle slot chiếm nhiều |
k <= 26 nên log k thực chất là hằng số. Sự khác biệt thực sự nằm ở m so với N. Khi lịch trình không có idle time, m = N và cả hai approach trong tầm nhau. Khi idle time lớn, m >> N và công thức thắng rõ ràng.
Pattern cơ bản: greedy frequency scheduling
Bài này là instance tiêu chuẩn của một nhóm bài scheduling mà ràng buộc là "khoảng cách tối thiểu giữa các lần lặp cùng một item". Cấu trúc trong tất cả chúng đều giống nhau: item có tần suất cao nhất là nút thắt cổ chai, và mọi thứ còn lại hoặc lấp đầy các khoảng hoặc tạo thêm idle time.
Pattern này xuất hiện trong nhiều bài:
- 767. Reorganize String — cooldown bằng 1 (không có hai ký tự giống nhau liền kề). Công thức đơn giản hơn: nếu
max_freq > (len(s) + 1) / 2, bài vô nghiệm. Ngược lại, cấu trúc chunk đảm bảo cách sắp xếp hợp lệ. Không có idle; sắp xếp lại trong độ dài gốc. - 358. Rearrange String k Distance Apart — tổng quát trực tiếp: cooldown là
k. Cùng logic chunk; trả về""thay vì idle count nếu không thể. - 1054. Distant Barcodes — barcode không được có neighbors giống nhau. Tương đương Reorganize String với kiểu output khác. Cùng insight "điền vào các chỉ số chẵn trước bằng phần tử có tần suất cao nhất".
- 2365. Task Scheduler II — mỗi task có cooldown riêng (một map, không phải một
nduy nhất). Simulation với map-based cooldown tracker thay thế uniform queue.
Trong tất cả chúng, quan sát cốt lõi là: đếm tần suất, tìm nút thắt cổ chai (tần suất lớn nhất), xây dựng xung quanh nó. Approach công thức hoạt động khi có cấu trúc closed-form; simulation là fallback khi cooldown từng loại task khác nhau.
Tôi sẽ ship phiên bản công thức — năm dòng, O(N), logic tự giải thích một khi bạn vẽ được hình chunk. Heap simulation là cái tôi dùng trong phỏng vấn nếu quên mất cách suy ra công thức, hoặc trong các bài mà cooldown thay đổi theo từng loại task. Cả hai đều đáng biết vì chúng đại diện cho hai mức độ insight khác nhau vào cùng một bài toán.
Series heap đến đây đã cover: trích xuất phần tử thứ k (Kth Largest), top-k selection (Top K Frequent Elements), k-way merge (Merge K Sorted Lists), và lịch biểu với truy cập theo thứ tự (Task Scheduler). Bước tự nhiên tiếp theo là bài running median — hai heap, một max và một min, duy trì điểm chia khi các phần tử đổ vào theo dòng. Đó là nơi cấu trúc dữ liệu heap thể hiện đúng công dụng thực sự của nó.
Thuộc series: Heap / Priority Queue
Đôi dòng ghi chép về những gì tôi đang xây
Nhận email khi tôi đăng bài mới — các bài kỹ thuật, không spam. Hủy đăng ký bất cứ lúc nào.
Bình luận
Chưa có bình luận nào. Hãy là người đầu tiên.
