Olox Olox

Theme

Documentation
Back to Home

Heap & Priority Queue Patterns

9 min read

Heap & Priority Queue Patterns

📚 Overview

Heaps are essential for problems involving streaming data, top-k elements, and priority-based processing.


1️⃣ Top K Elements

Kth Largest Element (LC 215)

import heapq

def find_kth_largest(nums: list[int], k: int) -> int:
    """Find kth largest element - O(n log k)"""
    # Min heap of size k
    heap = nums[:k]
    heapq.heapify(heap)
    
    for num in nums[k:]:
        if num > heap[0]:
            heapq.heapreplace(heap, num)
    
    return heap[0]

def find_kth_largest_quickselect(nums: list[int], k: int) -> int:
    """Quickselect approach - O(n) average"""
    import random
    
    def partition(left: int, right: int, pivot_idx: int) -> int:
        pivot = nums[pivot_idx]
        nums[pivot_idx], nums[right] = nums[right], nums[pivot_idx]
        store_idx = left
        
        for i in range(left, right):
            if nums[i] > pivot:
                nums[i], nums[store_idx] = nums[store_idx], nums[i]
                store_idx += 1
        
        nums[store_idx], nums[right] = nums[right], nums[store_idx]
        return store_idx
    
    left, right = 0, len(nums) - 1
    target = k - 1
    
    while True:
        pivot_idx = random.randint(left, right)
        pivot_idx = partition(left, right, pivot_idx)
        
        if pivot_idx == target:
            return nums[pivot_idx]
        elif pivot_idx < target:
            left = pivot_idx + 1
        else:
            right = pivot_idx - 1

Top K Frequent Elements (LC 347)

def top_k_frequent(nums: list[int], k: int) -> list[int]:
    """Top k most frequent elements"""
    from collections import Counter
    
    freq = Counter(nums)
    
    # Method 1: Heap
    return heapq.nlargest(k, freq.keys(), key=freq.get)
    
def top_k_frequent_bucket(nums: list[int], k: int) -> list[int]:
    """Bucket sort approach - O(n)"""
    from collections import Counter
    
    freq = Counter(nums)
    
    # Bucket by frequency
    buckets = [[] for _ in range(len(nums) + 1)]
    for num, count in freq.items():
        buckets[count].append(num)
    
    result = []
    for i in range(len(buckets) - 1, -1, -1):
        result.extend(buckets[i])
        if len(result) >= k:
            return result[:k]
    
    return result

K Closest Points to Origin (LC 973)

def k_closest(points: list[list[int]], k: int) -> list[list[int]]:
    """K closest points to origin"""
    # Max heap to keep k smallest distances
    heap = []
    
    for x, y in points:
        dist = x*x + y*y
        
        if len(heap) < k:
            heapq.heappush(heap, (-dist, x, y))
        elif dist < -heap[0][0]:
            heapq.heapreplace(heap, (-dist, x, y))
    
    return [[x, y] for _, x, y in heap]

2️⃣ Merge K Sorted

Merge K Sorted Lists (LC 23)

def merge_k_lists(lists: list) -> 'ListNode':
    """Merge k sorted linked lists"""
    heap = []
    
    for i, lst in enumerate(lists):
        if lst:
            heapq.heappush(heap, (lst.val, i, lst))
    
    dummy = ListNode(0)
    current = dummy
    
    while heap:
        val, i, node = heapq.heappop(heap)
        current.next = node
        current = current.next
        
        if node.next:
            heapq.heappush(heap, (node.next.val, i, node.next))
    
    return dummy.next

Smallest Range Covering Elements from K Lists (LC 632)

def smallest_range(nums: list[list[int]]) -> list[int]:
    """Smallest range containing at least one element from each list"""
    heap = []
    max_val = float('-inf')
    
    # Initialize with first element from each list
    for i, lst in enumerate(nums):
        heapq.heappush(heap, (lst[0], i, 0))
        max_val = max(max_val, lst[0])
    
    result = [float('-inf'), float('inf')]
    
    while True:
        min_val, i, j = heapq.heappop(heap)
        
        if max_val - min_val < result[1] - result[0]:
            result = [min_val, max_val]
        
        if j + 1 == len(nums[i]):
            break
        
        next_val = nums[i][j + 1]
        heapq.heappush(heap, (next_val, i, j + 1))
        max_val = max(max_val, next_val)
    
    return result

3️⃣ Sliding Window with Heap

Sliding Window Maximum (LC 239)

def max_sliding_window(nums: list[int], k: int) -> list[int]:
    """Maximum in each sliding window of size k"""
    from collections import deque
    
    # Monotonic decreasing deque
    dq = deque()
    result = []
    
    for i, num in enumerate(nums):
        # Remove elements outside window
        while dq and dq[0] <= i - k:
            dq.popleft()
        
        # Maintain decreasing order
        while dq and nums[dq[-1]] < num:
            dq.pop()
        
        dq.append(i)
        
        if i >= k - 1:
            result.append(nums[dq[0]])
    
    return result

def max_sliding_window_heap(nums: list[int], k: int) -> list[int]:
    """Heap-based solution"""
    heap = []  # Max heap: (-value, index)
    result = []
    
    for i in range(len(nums)):
        heapq.heappush(heap, (-nums[i], i))
        
        if i >= k - 1:
            # Remove elements outside window
            while heap[0][1] <= i - k:
                heapq.heappop(heap)
            
            result.append(-heap[0][0])
    
    return result

Sliding Window Median (LC 480)

def median_sliding_window(nums: list[int], k: int) -> list[float]:
    """Median of each sliding window"""
    from sortedcontainers import SortedList
    
    window = SortedList(nums[:k])
    result = []
    
    def get_median():
        if k % 2 == 1:
            return float(window[k // 2])
        return (window[k // 2 - 1] + window[k // 2]) / 2
    
    result.append(get_median())
    
    for i in range(k, len(nums)):
        window.add(nums[i])
        window.remove(nums[i - k])
        result.append(get_median())
    
    return result

4️⃣ Two Heaps Pattern

Find Median from Data Stream (LC 295)

class MedianFinder:
    """Running median using two heaps"""
    def __init__(self):
        self.small = []  # Max heap (negated)
        self.large = []  # Min heap
    
    def addNum(self, num: int) -> None:
        # Add to max heap first
        heapq.heappush(self.small, -num)
        
        # Balance: largest of small should <= smallest of large
        if self.small and self.large and (-self.small[0] > self.large[0]):
            heapq.heappush(self.large, -heapq.heappop(self.small))
        
        # Balance sizes
        if len(self.small) > len(self.large) + 1:
            heapq.heappush(self.large, -heapq.heappop(self.small))
        elif len(self.large) > len(self.small):
            heapq.heappush(self.small, -heapq.heappop(self.large))
    
    def findMedian(self) -> float:
        if len(self.small) > len(self.large):
            return -self.small[0]
        return (-self.small[0] + self.large[0]) / 2

IPO Problem (LC 502)

def find_maximized_capital(k: int, w: int, profits: list[int], capital: list[int]) -> int:
    """Maximize capital after k projects"""
    n = len(profits)
    projects = sorted(zip(capital, profits))
    
    max_heap = []  # Available project profits (negated)
    i = 0
    
    for _ in range(k):
        # Add affordable projects
        while i < n and projects[i][0] <= w:
            heapq.heappush(max_heap, -projects[i][1])
            i += 1
        
        if not max_heap:
            break
        
        w += -heapq.heappop(max_heap)
    
    return w

5️⃣ Schedule/Meeting Problems

Meeting Rooms II (LC 253)

def min_meeting_rooms(intervals: list[list[int]]) -> int:
    """Minimum meeting rooms required"""
    intervals.sort()
    
    rooms = []  # End times
    
    for start, end in intervals:
        if rooms and rooms[0] <= start:
            heapq.heapreplace(rooms, end)
        else:
            heapq.heappush(rooms, end)
    
    return len(rooms)

Task Scheduler (LC 621)

def least_interval(tasks: list[str], n: int) -> int:
    """Minimum time to complete all tasks with cooldown"""
    from collections import Counter
    
    freq = Counter(tasks)
    max_heap = [-f for f in freq.values()]
    heapq.heapify(max_heap)
    
    time = 0
    cooldown = []  # (available_time, count)
    
    while max_heap or cooldown:
        time += 1
        
        if max_heap:
            count = heapq.heappop(max_heap) + 1  # Decrement (negated)
            if count < 0:
                cooldown.append((time + n, count))
        
        if cooldown and cooldown[0][0] == time:
            heapq.heappush(max_heap, cooldown.pop(0)[1])
    
    return time

6️⃣ Graph Shortest Path

Network Delay Time (LC 743)

def network_delay_time(times: list[list[int]], n: int, k: int) -> int:
    """Dijkstra's shortest path"""
    from collections import defaultdict
    
    graph = defaultdict(list)
    for u, v, w in times:
        graph[u].append((v, w))
    
    dist = {k: 0}
    heap = [(0, k)]
    
    while heap:
        d, node = heapq.heappop(heap)
        
        if d > dist.get(node, float('inf')):
            continue
        
        for neighbor, weight in graph[node]:
            new_dist = d + weight
            if new_dist < dist.get(neighbor, float('inf')):
                dist[neighbor] = new_dist
                heapq.heappush(heap, (new_dist, neighbor))
    
    return max(dist.values()) if len(dist) == n else -1

Cheapest Flights Within K Stops (LC 787)

def find_cheapest_price(n: int, flights: list[list[int]], src: int, dst: int, k: int) -> int:
    """Shortest path with at most k stops"""
    from collections import defaultdict
    
    graph = defaultdict(list)
    for u, v, w in flights:
        graph[u].append((v, w))
    
    # (cost, stops, node)
    heap = [(0, 0, src)]
    stops = [float('inf')] * n
    
    while heap:
        cost, num_stops, node = heapq.heappop(heap)
        
        if node == dst:
            return cost
        
        if num_stops > k or num_stops >= stops[node]:
            continue
        
        stops[node] = num_stops
        
        for neighbor, weight in graph[node]:
            heapq.heappush(heap, (cost + weight, num_stops + 1, neighbor))
    
    return -1

7️⃣ Greedy with Heap

Minimum Cost to Connect Sticks (LC 1167)

def connect_sticks(sticks: list[int]) -> int:
    """Minimum cost to connect all sticks"""
    heapq.heapify(sticks)
    total_cost = 0
    
    while len(sticks) > 1:
        first = heapq.heappop(sticks)
        second = heapq.heappop(sticks)
        cost = first + second
        total_cost += cost
        heapq.heappush(sticks, cost)
    
    return total_cost

Maximum Performance of a Team (LC 1383)

def max_performance(n: int, speed: list[int], efficiency: list[int], k: int) -> int:
    """Max sum(speed) * min(efficiency) with at most k engineers"""
    MOD = 10**9 + 7
    
    # Sort by efficiency descending
    engineers = sorted(zip(efficiency, speed), reverse=True)
    
    speed_heap = []  # Min heap of speeds
    speed_sum = 0
    max_perf = 0
    
    for eff, spd in engineers:
        heapq.heappush(speed_heap, spd)
        speed_sum += spd
        
        if len(speed_heap) > k:
            speed_sum -= heapq.heappop(speed_heap)
        
        max_perf = max(max_perf, speed_sum * eff)
    
    return max_perf % MOD

8️⃣ K-way Operations

Find K Pairs with Smallest Sums (LC 373)

def k_smallest_pairs(nums1: list[int], nums2: list[int], k: int) -> list[list[int]]:
    """K pairs with smallest sums from two sorted arrays"""
    if not nums1 or not nums2:
        return []
    
    heap = [(nums1[0] + nums2[0], 0, 0)]
    seen = {(0, 0)}
    result = []
    
    while heap and len(result) < k:
        _, i, j = heapq.heappop(heap)
        result.append([nums1[i], nums2[j]])
        
        if i + 1 < len(nums1) and (i + 1, j) not in seen:
            heapq.heappush(heap, (nums1[i + 1] + nums2[j], i + 1, j))
            seen.add((i + 1, j))
        
        if j + 1 < len(nums2) and (i, j + 1) not in seen:
            heapq.heappush(heap, (nums1[i] + nums2[j + 1], i, j + 1))
            seen.add((i, j + 1))
    
    return result

Kth Smallest Element in a Sorted Matrix (LC 378)

def kth_smallest(matrix: list[list[int]], k: int) -> int:
    """Kth smallest in row and column sorted matrix"""
    n = len(matrix)
    heap = [(matrix[0][0], 0, 0)]
    seen = {(0, 0)}
    
    for _ in range(k):
        val, r, c = heapq.heappop(heap)
        
        if r + 1 < n and (r + 1, c) not in seen:
            heapq.heappush(heap, (matrix[r + 1][c], r + 1, c))
            seen.add((r + 1, c))
        
        if c + 1 < n and (r, c + 1) not in seen:
            heapq.heappush(heap, (matrix[r][c + 1], r, c + 1))
            seen.add((r, c + 1))
    
    return val

9️⃣ Custom Comparator

Sort Characters By Frequency (LC 451)

def frequency_sort(s: str) -> str:
    """Sort characters by frequency descending"""
    from collections import Counter
    
    freq = Counter(s)
    heap = [(-count, char) for char, count in freq.items()]
    heapq.heapify(heap)
    
    result = []
    while heap:
        count, char = heapq.heappop(heap)
        result.append(char * (-count))
    
    return ''.join(result)

Reorganize String (LC 767)

def reorganize_string(s: str) -> str:
    """Rearrange so no adjacent chars same"""
    from collections import Counter
    
    freq = Counter(s)
    max_freq = max(freq.values())
    
    if max_freq > (len(s) + 1) // 2:
        return ""
    
    heap = [(-count, char) for char, count in freq.items()]
    heapq.heapify(heap)
    
    result = []
    prev_count, prev_char = 0, ''
    
    while heap:
        count, char = heapq.heappop(heap)
        result.append(char)
        
        if prev_count < 0:
            heapq.heappush(heap, (prev_count, prev_char))
        
        prev_count = count + 1
        prev_char = char
    
    return ''.join(result)

🔟 Lazy Deletion Pattern

Design Twitter (LC 355)

class Twitter:
    def __init__(self):
        self.time = 0
        self.tweets = {}      # user -> [(time, tweet_id)]
        self.following = {}   # user -> set of followees
    
    def postTweet(self, userId: int, tweetId: int) -> None:
        if userId not in self.tweets:
            self.tweets[userId] = []
        self.tweets[userId].append((self.time, tweetId))
        self.time += 1
    
    def getNewsFeed(self, userId: int) -> list[int]:
        # Get all relevant users
        users = {userId}
        if userId in self.following:
            users.update(self.following[userId])
        
        # Merge tweets using heap
        heap = []
        for user in users:
            if user in self.tweets:
                tweets = self.tweets[user]
                if tweets:
                    idx = len(tweets) - 1
                    heapq.heappush(heap, (-tweets[idx][0], tweets[idx][1], user, idx))
        
        result = []
        while heap and len(result) < 10:
            _, tweet_id, user, idx = heapq.heappop(heap)
            result.append(tweet_id)
            
            if idx > 0:
                idx -= 1
                t = self.tweets[user][idx]
                heapq.heappush(heap, (-t[0], t[1], user, idx))
        
        return result
    
    def follow(self, followerId: int, followeeId: int) -> None:
        if followerId not in self.following:
            self.following[followerId] = set()
        self.following[followerId].add(followeeId)
    
    def unfollow(self, followerId: int, followeeId: int) -> None:
        if followerId in self.following:
            self.following[followerId].discard(followeeId)

📊 Complexity Summary

OperationMin HeapMax Heap
InsertO(log n)O(log n)
ExtractO(log n)O(log n)
PeekO(1)O(1)
HeapifyO(n)O(n)

ProblemLC #Pattern
Kth Largest Element215Top K
Top K Frequent Elements347Top K
Merge K Sorted Lists23K-way Merge
Sliding Window Maximum239Deque/Heap
Find Median from Data Stream295Two Heaps
Meeting Rooms II253Schedule
Task Scheduler621Greedy + Heap
Network Delay Time743Dijkstra
K Closest Points973Top K
Reorganize String767Greedy

Last Updated: 2024