Find Median from Data Stream: tại sao hai heap đánh bại mảng đã sắp xếp
Median của một mảng tĩnh thì đơn giản: sort một lần, lấy phần tử giữa. Constraint thú vị ở đây là các số đến từng cái một và bạn phải trả lời findMedian đúng sau mỗi lần chèn — bạn không bao giờ biết trước toàn bộ dữ liệu. Đó là điều biến bài này thành bài thiết kế. Câu hỏi không phải "làm sao tính median" mà là "cấu trúc dữ liệu nào giúp tôi duy trì median rẻ qua chuỗi insert không giới hạn?"
Constraints thu hẹp không gian tìm kiếm trước khi bạn viết một dòng. num bị giới hạn từ -10^5 đến 10^5 — không lo overflow. Tối đa 5 * 10^4 lần gọi tổng cộng. Con số cuối đó mới là tín hiệu thực sự: nếu bạn có tới năm mươi nghìn operation và findMedian được gọi thường xuyên, một giải pháp sort toàn bộ mảng mỗi lần query sẽ tốn O(n log n) mỗi lần gọi và xuống cấp nghiêm trọng. Bạn cần thứ gì đó mà chi phí-mỗi-operation vẫn bị chặn khi n tăng.
Đề bài
Thiết kế class MedianFinder hỗ trợ hai thao tác: addNum(num) thêm một số nguyên từ data stream, và findMedian() trả về median của tất cả số nguyên đã thêm cho đến thời điểm đó — median là giá trị giữa khi số lượng phần tử lẻ, hoặc trung bình của hai giá trị giữa khi số lượng phần tử chẵn. Full statement: LeetCode 295.
Ràng buộc:
- -10^5 <= num <= 10^5
- Sẽ có ít nhất một phần tử trong cấu trúc dữ liệu trước khi gọi findMedian.
- Tối đa 5 * 10^4 lần gọi đến addNum và findMedian.Approach 1: sort mỗi lần query
Thiết kế đơn giản nhất có thể: thu thập tất cả số vào một list thông thường, sort list bên trong findMedian, và đọc phần tử giữa. Không có invariant nào cần duy trì giữa các lần gọi.
class MedianFinder:
def __init__(self):
self.nums = []
def addNum(self, num: int) -> None:
self.nums.append(num)
def findMedian(self) -> float:
self.nums.sort()
n = len(self.nums)
if n % 2 == 1:
return float(self.nums[n // 2])
return (self.nums[n // 2 - 1] + self.nums[n // 2]) / 2.0public class MedianFinder {
private List<int> nums;
public MedianFinder() {
nums = new List<int>();
}
public void AddNum(int num) {
nums.Add(num);
}
public double FindMedian() {
nums.Sort();
int n = nums.Count;
if (n % 2 == 1) return (double)nums[n / 2];
return (nums[n / 2 - 1] + nums[n / 2]) / 2.0;
}
}addNum là O(1). findMedian là O(n log n) — Timsort của Python, introsort của C#, cả hai O(n log n) worst case. Space là O(n).
Vấn đề ở đây là mỗi lần gọi findMedian đều sort lại công việc đã làm. Bạn không tận dụng được gì từ lần sort trước. Nếu findMedian được gọi sau mỗi lần insert, bạn tốn O(n log n) mỗi lần và O(n² log n) tổng cộng cho n lần insert. Với n = 5 * 10^4, đó là khoảng 25 tỷ operation trong worst case — vượt xa giới hạn thời gian.
Approach 2: duy trì sorted list, chèn đúng vị trí
Fix hiển nhiên: giữ list luôn được sort. Khi đó findMedian là O(1) — chỉ cần index phần tử giữa — và chi phí sort chuyển hoàn toàn vào addNum.
Binary search tìm index chèn đúng trong O(log n). Thao tác shift mảng thực tế sau đó tốn O(n) vì mọi thứ bên phải phải di chuyển. Bạn đổi O(n log n) cho findMedian thành O(n) cho addNum — tốt hơn khi query nhiều hơn insert nhưng vẫn tuyến tính mỗi lần insert.
import bisect
class MedianFinder:
def __init__(self):
self.nums = []
def addNum(self, num: int) -> None:
bisect.insort(self.nums, num)
def findMedian(self) -> float:
n = len(self.nums)
if n % 2 == 1:
return float(self.nums[n // 2])
return (self.nums[n // 2 - 1] + self.nums[n // 2]) / 2.0public class MedianFinder {
private List<int> nums;
public MedianFinder() {
nums = new List<int>();
}
public void AddNum(int num) {
// Binary search tìm vị trí chèn, sau đó insert để duy trì thứ tự
int lo = 0, hi = nums.Count;
while (lo < hi) {
int mid = lo + (hi - lo) / 2;
if (nums[mid] < num) lo = mid + 1;
else hi = mid;
}
nums.Insert(lo, num);
}
public double FindMedian() {
int n = nums.Count;
if (n % 2 == 1) return (double)nums[n / 2];
return (nums[n / 2 - 1] + nums[n / 2]) / 2.0;
}
}addNum: O(n) — O(log n) tìm kiếm, O(n) shift. findMedian: O(1). Space: O(n).
Tốt hơn Approach 1 khi query chiếm đa số, nhưng vẫn tuyến tính mỗi lần insert. Với 5 * 10^4 lần insert đều xảy ra trước bất kỳ query nào, worst case vẫn là O(n²) tổng cộng. Chi phí shift trong List.Insert của C# và bisect.insort của Python không thể tránh khỏi với array-backed list — chèn vào giữa nghĩa là phải di chuyển memory.
Tách hai heap
Insight then chốt là: bạn không cần các số theo thứ tự đầy đủ để tìm median. Bạn chỉ cần biết phần tử nằm ở ranh giới giữa nửa dưới và nửa trên.
Chia các số thành hai nhóm: tất cả nhỏ hơn hoặc bằng median đi vào max-heap (để số lớn nhất của nửa nhỏ luôn ở trên cùng), và tất cả lớn hơn đi vào min-heap (để số nhỏ nhất của nửa lớn luôn ở trên cùng). Giữ hai heap cân bằng để chênh lệch kích thước không quá một phần tử.
Median khi đó là:
- Top của max-heap, nếu max-heap có nhiều hơn min-heap một phần tử (tổng số lẻ)
- Trung bình của hai top, nếu hai heap bằng nhau (tổng số chẵn)
Mỗi addNum thực hiện nhiều nhất hai heap operation: một lần push và tùy chọn một lần pop+push để rebalance. Cả hai là O(log n). findMedian chỉ cần peek vào một hoặc hai heap top — O(1).
import heapq
class MedianFinder:
def __init__(self):
# max_heap lưu nửa dưới; Python chỉ có min-heap,
# nên negate giá trị để simulate max behavior
self.max_heap = [] # nửa dưới
self.min_heap = [] # nửa trên
def addNum(self, num: int) -> None:
# Route vào đúng nửa
if not self.max_heap or num <= -self.max_heap[0]:
heapq.heappush(self.max_heap, -num)
else:
heapq.heappush(self.min_heap, num)
# Rebalance: max_heap có thể giữ nhiều hơn min_heap tối đa 1 phần tử
if len(self.max_heap) > len(self.min_heap) + 1:
val = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, val)
elif len(self.min_heap) > len(self.max_heap):
val = heapq.heappop(self.min_heap)
heapq.heappush(self.max_heap, -val)
def findMedian(self) -> float:
if len(self.max_heap) == len(self.min_heap):
return (-self.max_heap[0] + self.min_heap[0]) / 2.0
return float(-self.max_heap[0])using System.Collections.Generic;
public class MedianFinder {
// C# PriorityQueue<TElement, TPriority> mặc định là min-heap.
// Cho phía max-heap, dùng negative priority để đảo thứ tự.
private PriorityQueue<int, int> maxHeap; // nửa dưới, max ở trên qua neg priority
private PriorityQueue<int, int> minHeap; // nửa trên, min ở trên
public MedianFinder() {
maxHeap = new PriorityQueue<int, int>();
minHeap = new PriorityQueue<int, int>();
}
public void AddNum(int num) {
if (maxHeap.Count == 0 || num <= maxHeap.Peek()) {
maxHeap.Enqueue(num, -num);
} else {
minHeap.Enqueue(num, num);
}
// Rebalance
if (maxHeap.Count > minHeap.Count + 1) {
int val = maxHeap.Dequeue();
minHeap.Enqueue(val, val);
} else if (minHeap.Count > maxHeap.Count) {
int val = minHeap.Dequeue();
maxHeap.Enqueue(val, -val);
}
}
public double FindMedian() {
if (maxHeap.Count == minHeap.Count) {
return (maxHeap.Peek() + minHeap.Peek()) / 2.0;
}
return (double)maxHeap.Peek();
}
}addNum: O(log n). findMedian: O(1). Space: O(n).
Trace tay qua ví dụ trong đề
Ví dụ của bài: addNum(1), addNum(2), findMedian(), addNum(3), findMedian().
Ban đầu:
max_heap = [] (nửa dưới)
min_heap = [] (nửa trên)
addNum(1):
max_heap rỗng -> push 1 vào max_heap
max_heap = [1]
min_heap = []
Kiểm tra balance: kích thước 1 vs 0, trong giới hạn chênh lệch 1. Không rebalance.
addNum(2):
2 > top của max_heap (1) -> push 2 vào min_heap
max_heap = [1]
min_heap = [2]
Kiểm tra balance: kích thước 1 vs 1. Bằng nhau. Không rebalance.
findMedian():
Kích thước bằng nhau -> (max_heap.top + min_heap.top) / 2 = (1 + 2) / 2 = 1.5 ✓
addNum(3):
3 > top của max_heap (1) -> push 3 vào min_heap
max_heap = [1]
min_heap = [2, 3]
Kiểm tra balance: kích thước 1 vs 2. min_heap thừa một.
Pop 2 từ min_heap, push vào max_heap.
max_heap = [2, 1] (2 là max mới)
min_heap = [3]
Kích thước: 2 vs 1. max_heap giữ nhiều hơn một. Trong giới hạn.
findMedian():
max_heap có nhiều hơn một phần tử -> return max_heap.top = 2 ✓
Bước rebalance sau addNum(3) là nước đi then chốt. Một số đã vào nhầm nửa — nó lớn hơn ranh giới hiện tại — và rebalance sửa điều đó bằng cách đẩy min của nửa trên trở lại nửa dưới.
Các edge case và tại sao chúng không phải vấn đề
Một phần tử duy nhất trong stream: sau addNum(x), max_heap có một phần tử, min_heap rỗng. findMedian trả về float(-max_heap[0]) trực tiếp. Nhánh chẵn/lẻ xử lý điều này không cần special case.
Tất cả phần tử bằng nhau (ví dụ addNum(5) ba lần): mỗi số 5 thỏa num <= max_heap.top, nên cả ba ban đầu vào max_heap. Rebalancing di chuyển phần tử sang min_heap khi cần. Top của cả hai heap đều là 5, nên dù bạn lấy riêng top của max_heap hay trung bình cả hai, kết quả vẫn là 5.0. Đúng.
Số âm: điều kiện routing num <= -max_heap[0] trong Python (nơi giá trị lưu bị negate) và num <= maxHeap.Peek() trong C# hoạt động giống nhau với số âm. Không cần xử lý đặc biệt — invariant được bảo toàn độc lập với dấu.
Xen kẽ add và query: rebalance chạy sau mỗi addNum, nên invariant đúng vào thời điểm bất kỳ findMedian nào được gọi, bất kể thứ tự operation.
Overflow khi tính trung bình: trong C#, maxHeap.Peek() + minHeap.Peek() với cả hai giá trị gần 10^5 thì tổng tối đa là 2 * 10^5 = 200000 — vẫn nằm trong phạm vi int (max int khoảng 2.1 tỷ). Không có rủi ro overflow với bài này.
So sánh ba approach
| Approach | addNum | findMedian | Space | Khi nào phù hợp |
|---|---|---|---|---|
| Sort mỗi query | O(1) | O(n log n) | O(n) | Query hiếm, insert nhiều |
| Sorted insert | O(n) | O(1) | O(n) | Insert hiếm, query nhiều |
| Two heaps | O(log n) | O(1) | O(n) | Workload hỗn hợp; luôn an toàn |
Sorted insert thắng hai heap nếu insert cực kỳ hiếm và query chiếm đa số, vì O(1) findMedian miễn phí. Nhưng trong bài toán data stream, tiền đề là insert thường xuyên — chính xác là nơi sorted insert xuống cấp. Two heaps là mặc định đúng.
Tôi sẽ ship cái gì
Two heaps, không do dự. Profile O(log n) / O(1) là lý tưởng cho workload streaming, và invariant (max_heap.size bằng min_heap.size hoặc bằng min_heap.size + 1) dễ duy trì và dễ kiểm tra trong code review.
Thứ duy nhất tôi sẽ double-check trong codebase thực: simulation max-heap qua negation trong Python. Đây là Python idiom tiêu chuẩn, nhưng mỗi chỗ tương tác với max_heap bạn phải nhớ negate. Trong C#, API PriorityQueue<TElement, TPriority> làm điều này sạch hơn — bạn truyền negative của giá trị làm priority cho phía max, và dequeue luôn trả về phần tử gốc (không negate). Ít mental overhead hơn.
Follow-up trong đề gợi ý optimization bằng bucket sort khi tất cả số nằm trong [0, 100]. Bạn duy trì frequency array 101 phần tử và theo dõi tổng số lần đếm; findMedian quét tối đa 101 bucket để tìm phần tử giữa. Điều này làm addNum O(1) và findMedian O(1), với chi phí là giá trị phải nằm trong phạm vi giới hạn. Tốt hơn cho follow-up bị ràng buộc; cho trường hợp tổng quát (giá trị trong [-10^5, 10^5]), bucket array tăng lên 200001 phần tử — vẫn là fixed-size nhưng lãng phí bộ nhớ nếu stream thưa thớt.
Pattern và nơi nó chuyển giao
Approach two-heap thuộc họ rộng hơn của cấu trúc "balanced partition": bạn chia dữ liệu thành hai nửa được duy trì bởi hai heap invariant bổ sung nhau (max bên trái, min bên phải) sao cho phần tử ranh giới luôn truy cập được trong O(1). Cùng ý tưởng xuất hiện trong:
- Sliding Window Median (LC 480) — cùng cấu trúc two-heap, nhưng số cũng rời khỏi window. Bạn cần lazy deletion hoặc ordered set để xử lý việc loại bỏ. Invariant heap và logic rebalance giống hệt; phức tạp thêm là bookkeeping việc xóa.
- IPO (LC 502) — dùng max-heap của profit và min-heap của capital requirement để luôn chọn project lợi nhuận cao nhất bạn hiện có thể thực hiện. Hai heap lại, nhưng lần này phục vụ greedy selection thay vì partition.
- Kth Largest Element in a Stream (LC 703) — một min-heap kích thước k có top luôn là phần tử lớn thứ k. Phiên bản đơn giản hóa của một nửa bài này.
- Design Twitter (LC 355) — dùng heap cho bước merge của nhiều sorted tweet stream. Trực giác "duy trì ranh giới" giống nhau; heap ở đây merge thay vì partition.
Mental model để mang theo: bất cứ khi nào bài toán yêu cầu duy trì order statistic đang chạy (median, phần tử thứ k, top-k) trên tập động, và cả insert lẫn query đều phải nhanh, một hoặc hai heap cho bạn insert logarithmic và truy cập constant-time vào extreme — gần như luôn đủ.
Bài này nằm ở vị trí số 7 trong series heap / priority queue, sau Last Stone Weight, Kth Largest Element, K Closest Points to Origin, và Task Scheduler. Những bài đó đều dùng một heap duy nhất. Đây là bài đầu tiên trong series ghép hai heap lại với nhau — bước cấu trúc đi lên từ "chọn phần tử extreme" sang "duy trì phần tử giữa."
Thuộc series: Heap / Priority Queue
Đôi dòng ghi chép về những gì tôi đang xây
Nhận email khi tôi đăng bài mới — các bài kỹ thuật, không spam. Hủy đăng ký bất cứ lúc nào.
Bình luận
Chưa có bình luận nào. Hãy là người đầu tiên.
