Hard Level - Complete Solutions#

This notebook contains solutions to all exercises from the Hard level lessons.

⚠️ Important Notes#

  • Attempt first: Try solving exercises before looking at solutions

  • Multiple approaches: Many problems have several valid solutions

  • Understand deeply: Don’t just copy code - understand the algorithms

  • Complexity analysis: Pay attention to time and space complexity

  • Test thoroughly: Use edge cases and stress tests

Contents#

  1. 01 - Advanced Functions and Decorators

  2. 02 - Generators and Iterators

  3. 03 - Algorithms and Complexity

  4. 04 - Deep Learning and Neural Networks

  5. 05 - Advanced ML and NLP

  6. 06 - Computer Systems and Theory

  7. 08 - Classic Problems Challenges

  8. 09 - CTF Challenges


01 - Advanced Functions and Decorators#

Exercise: Memoization Decorator#

Create a memoization decorator that caches function results.

from functools import wraps
import time

def memoize(func):
    """
    Memoization decorator with cache.
    
    Time: O(1) for cached results, Space: O(n) for cache
    """
    cache = {}
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Create hashable key from arguments
        # Note: This works for hashable args only (numbers, strings, tuples)
        key = (args, tuple(sorted(kwargs.items())))
        
        if key in cache:
            print(f"Cache hit for {func.__name__}{args}")
            return cache[key]
        
        print(f"Computing {func.__name__}{args}")
        result = func(*args, **kwargs)
        cache[key] = result
        return result
    
    # Expose cache for inspection
    wrapper.cache = cache
    wrapper.cache_clear = lambda: cache.clear()
    
    return wrapper

# Test with Fibonacci - classic memoization example
@memoize
def fibonacci(n):
    """Calculate nth Fibonacci number."""
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

print("\n=== Testing Memoization ===")
print(f"fib(10) = {fibonacci(10)}")
print(f"\nCache size: {len(fibonacci.cache)}")
print(f"Cached values: {dict(list(fibonacci.cache.items())[:5])}")

# Compare performance
def fibonacci_no_memo(n):
    if n <= 1:
        return n
    return fibonacci_no_memo(n-1) + fibonacci_no_memo(n-2)

print("\n=== Performance Comparison ===")
fibonacci.cache_clear()  # Clear cache

start = time.time()
result1 = fibonacci(30)
time1 = time.time() - start

start = time.time()
result2 = fibonacci_no_memo(30)
time2 = time.time() - start

print(f"With memoization: {time1:.6f}s")
print(f"Without memoization: {time2:.6f}s")
print(f"Speedup: {time2/time1:.1f}x faster!")

Alternative: Using functools.lru_cache#

from functools import lru_cache

# Python's built-in LRU cache with max size
@lru_cache(maxsize=128)
def fibonacci_lru(n):
    """Fibonacci with LRU cache (Least Recently Used)."""
    if n <= 1:
        return n
    return fibonacci_lru(n-1) + fibonacci_lru(n-2)

print("Using functools.lru_cache:")
result = fibonacci_lru(50)
print(f"fib(50) = {result}")
print(f"Cache info: {fibonacci_lru.cache_info()}")

02 - Generators and Iterators#

Exercise: Generator Pipeline#

Create a generator pipeline that generates, filters, and transforms numbers.

import random

def number_generator(n):
    """
    Generate n random numbers between 1 and 100.
    
    Space: O(1) - yields one at a time
    """
    for _ in range(n):
        yield random.randint(1, 100)

def divisible_by_3_and_5(numbers):
    """
    Filter numbers divisible by both 3 and 5 (i.e., divisible by 15).
    """
    for num in numbers:
        if num % 15 == 0:  # Divisible by both 3 and 5
            yield num

def multiply_by_2(numbers):
    """
    Transform each number by multiplying by 2.
    """
    for num in numbers:
        yield num * 2

# Complete pipeline
def complete_pipeline(count):
    """
    Complete generator pipeline:
    Generate -> Filter (div by 15) -> Transform (multiply by 2)
    """
    numbers = number_generator(count)
    filtered = divisible_by_3_and_5(numbers)
    transformed = multiply_by_2(filtered)
    return transformed

# Test the pipeline
print("=== Generator Pipeline ===")
random.seed(42)  # For reproducibility
pipeline = complete_pipeline(100)

results = list(pipeline)
print(f"Generated 100 numbers, found {len(results)} divisible by 15")
print(f"Transformed results: {results}")

# Verify: All results should be even and when divided by 2, divisible by 15
print("\nVerification:")
for num in results[:5]:  # Check first 5
    original = num // 2
    print(f"  {num} -> original {original}, divisible by 15: {original % 15 == 0}")

Alternative: One-liner with Generator Expressions#

# Compact version using generator expressions
def pipeline_compact(count):
    """Compact pipeline using generator expressions."""
    return (
        num * 2 
        for num in (random.randint(1, 100) for _ in range(count))
        if num % 15 == 0
    )

random.seed(42)
results_compact = list(pipeline_compact(100))
print(f"\nCompact version results: {results_compact}")

Bonus: Infinite Generator with Limiting#

from itertools import islice

def infinite_random_numbers():
    """Infinite generator of random numbers."""
    while True:
        yield random.randint(1, 100)

# Take only first 10 numbers that pass the filter
random.seed(42)
pipeline_infinite = multiply_by_2(
    divisible_by_3_and_5(
        infinite_random_numbers()
    )
)

first_10 = list(islice(pipeline_infinite, 10))
print(f"\nFirst 10 from infinite pipeline: {first_10}")

03 - Algorithms and Complexity#

Exercise: Dijkstra’s Shortest Path Algorithm#

Implement Dijkstra’s algorithm to find shortest paths in a weighted graph.

import heapq
from collections import defaultdict
from typing import Dict, List, Tuple

class WeightedGraph:
    """
    Weighted directed graph using adjacency list.
    """
    def __init__(self):
        self.graph = defaultdict(list)  # {node: [(neighbor, weight), ...]}
        self.nodes = set()
    
    def add_edge(self, u, v, weight):
        """Add directed edge from u to v with given weight."""
        self.graph[u].append((v, weight))
        self.nodes.add(u)
        self.nodes.add(v)
    
    def add_undirected_edge(self, u, v, weight):
        """Add undirected edge (bidirectional)."""
        self.add_edge(u, v, weight)
        self.add_edge(v, u, weight)
    
    def dijkstra(self, start):
        """
        Dijkstra's shortest path algorithm.
        
        Time: O((V + E) log V) with min-heap
        Space: O(V)
        
        Returns:
            distances: Dict mapping node to shortest distance from start
            previous: Dict mapping node to previous node in shortest path
        """
        # Initialize distances to infinity
        distances = {node: float('inf') for node in self.nodes}
        distances[start] = 0
        
        # Track previous node in path for reconstruction
        previous = {node: None for node in self.nodes}
        
        # Min heap: (distance, node)
        pq = [(0, start)]
        visited = set()
        
        while pq:
            current_dist, current = heapq.heappop(pq)
            
            # Skip if already visited (handles duplicate entries)
            if current in visited:
                continue
            
            visited.add(current)
            
            # Explore neighbors
            for neighbor, weight in self.graph[current]:
                distance = current_dist + weight
                
                # Found shorter path
                if distance < distances[neighbor]:
                    distances[neighbor] = distance
                    previous[neighbor] = current
                    heapq.heappush(pq, (distance, neighbor))
        
        return distances, previous
    
    def reconstruct_path(self, previous, start, end):
        """
        Reconstruct shortest path from start to end.
        """
        path = []
        current = end
        
        while current is not None:
            path.append(current)
            current = previous[current]
        
        path.reverse()
        
        # Check if path is valid
        if path[0] == start:
            return path
        return []  # No path exists

# Test with example graph
print("=== Dijkstra's Shortest Path ===")

# Create graph
#     (1)---4---(2)
#     /|        /|\\
#    1 |       1 | 5
#   /  2      /  | \\
# (0)  |    (4)  3  (5)
#   \\  |         | /
#    1 |         2/
#     \\|        /
#     (3)------/

g = WeightedGraph()
g.add_undirected_edge(0, 1, 1)
g.add_undirected_edge(0, 3, 1)
g.add_undirected_edge(1, 2, 4)
g.add_undirected_edge(1, 3, 2)
g.add_undirected_edge(2, 4, 1)
g.add_undirected_edge(2, 5, 5)
g.add_undirected_edge(3, 5, 2)
g.add_undirected_edge(4, 5, 3)

# Run Dijkstra from node 0
start_node = 0
distances, previous = g.dijkstra(start_node)

print(f"\nShortest distances from node {start_node}:")
for node in sorted(distances.keys()):
    dist = distances[node]
    if dist == float('inf'):
        print(f"  Node {node}: unreachable")
    else:
        path = g.reconstruct_path(previous, start_node, node)
        path_str = " -> ".join(map(str, path))
        print(f"  Node {node}: distance {dist}, path: {path_str}")

# Verify specific path
end_node = 5
path = g.reconstruct_path(previous, start_node, end_node)
print(f"\n✓ Shortest path from {start_node} to {end_node}: {' -> '.join(map(str, path))}")
print(f"  Total distance: {distances[end_node]}")

Complexity Analysis#

  • Time Complexity: O((V + E) log V)

    • Each vertex is processed once: O(V)

    • Each edge is relaxed once: O(E)

    • Heap operations: O(log V)

  • Space Complexity: O(V)

    • Distances dict: O(V)

    • Previous dict: O(V)

    • Priority queue: O(V)

    • Visited set: O(V)


04 - Deep Learning and Neural Networks#

Exercise: CNN for CIFAR-10#

Build and train a Convolutional Neural Network for CIFAR-10 image classification.

# Note: This requires TensorFlow/Keras
# pip install tensorflow

try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers, models
    from tensorflow.keras.preprocessing.image import ImageDataGenerator
    import numpy as np
    import matplotlib.pyplot as plt
    
    print(f"TensorFlow version: {tf.__version__}")
    
    # Load CIFAR-10 dataset
    print("\n=== Loading CIFAR-10 Dataset ===")
    (X_train, y_train), (X_test, y_test) = keras.datasets.cifar10.load_data()
    
    print(f"Training set: {X_train.shape}")
    print(f"Test set: {X_test.shape}")
    
    # Class names
    class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 
                   'dog', 'frog', 'horse', 'ship', 'truck']
    
    # Normalize pixel values to [0, 1]
    X_train = X_train.astype('float32') / 255.0
    X_test = X_test.astype('float32') / 255.0
    
    # Convert labels to categorical
    y_train = keras.utils.to_categorical(y_train, 10)
    y_test = keras.utils.to_categorical(y_test, 10)
    
    # Build CNN architecture
    print("\n=== Building CNN Model ===")
    
    model = models.Sequential([
        # Convolutional Block 1
        layers.Conv2D(32, (3, 3), activation='relu', padding='same', 
                      input_shape=(32, 32, 3)),
        layers.BatchNormalization(),
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.2),
        
        # Convolutional Block 2
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.3),
        
        # Convolutional Block 3
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.4),
        
        # Fully Connected Layers
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])
    
    model.summary()
    
    # Compile model
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Data augmentation
    print("\n=== Setting up Data Augmentation ===")
    datagen = ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        horizontal_flip=True,
        zoom_range=0.1
    )
    datagen.fit(X_train)
    
    # Train model (reduced epochs for demo)
    print("\n=== Training Model ===")
    print("(Using 5 epochs for demonstration - use 50+ for production)")
    
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=64),
        epochs=5,  # Use 50-100 for real training
        validation_data=(X_test, y_test),
        verbose=1
    )
    
    # Evaluate
    print("\n=== Final Evaluation ===")
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test accuracy: {test_acc:.4f}")
    print(f"Test loss: {test_loss:.4f}")
    
    # Make some predictions
    predictions = model.predict(X_test[:5])
    print("\n=== Sample Predictions ===")
    for i in range(5):
        pred_class = np.argmax(predictions[i])
        true_class = np.argmax(y_test[i])
        confidence = predictions[i][pred_class]
        print(f"Image {i}: Predicted '{class_names[pred_class]}' "
              f"({confidence:.2%}), True: '{class_names[true_class]}'")
    
    print("\n✓ CNN training complete!")
    print("\nNote: For better accuracy (80-90%), train for 50-100 epochs with:")
    print("  - Learning rate scheduling")
    print("  - More aggressive data augmentation")
    print("  - Deeper architecture (ResNet, EfficientNet)")
    
except ImportError:
    print("TensorFlow not installed. Install with: pip install tensorflow")
except Exception as e:
    print(f"Error: {e}")
    print("\nThis exercise requires TensorFlow and may take several minutes to train.")

Architecture Explanation#

3 Convolutional Blocks:

  • Block 1: 32 filters, captures basic features (edges, corners)

  • Block 2: 64 filters, captures intermediate features (textures, patterns)

  • Block 3: 128 filters, captures high-level features (parts, objects)

Key Techniques:

  • Batch Normalization: Stabilizes training, allows higher learning rates

  • Dropout: Prevents overfitting (0.2 → 0.5 progressively)

  • Data Augmentation: Increases effective training data

  • Adam Optimizer: Adaptive learning rate

Expected Performance:

  • With 5 epochs: ~60-70% accuracy

  • With 50+ epochs: ~80-85% accuracy

  • State-of-the-art: 95%+ (with advanced architectures)


05 - Advanced ML and NLP#

Exercise: Advanced Text Classification#

Build a text classification system comparing different vectorization methods.

# Text classification with multiple vectorization methods

import numpy as np
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score
import re

# Sample news articles dataset
print("=== Text Classification System ===")

# Create sample dataset (in practice, use real news dataset like 20newsgroups)
texts = [
    # Technology
    "New AI model achieves breakthrough in natural language processing",
    "Tech company releases innovative smartphone with advanced features",
    "Quantum computing advances bring us closer to practical applications",
    "Cybersecurity experts warn of new malware targeting businesses",
    "Software update brings performance improvements to popular app",
    "Machine learning algorithm predicts market trends accurately",
    "Cloud computing platform expands global infrastructure",
    "Developers release open-source framework for web applications",
    
    # Sports
    "Championship team wins decisive victory in final game",
    "Star athlete breaks long-standing record in track event",
    "Football match ends in dramatic penalty shootout",
    "Olympic gold medalist announces retirement from competition",
    "Basketball team secures playoff spot with winning streak",
    "Tennis player advances to finals after intense match",
    "Soccer club signs international superstar in major transfer",
    "Marathon runner completes race in record-breaking time",
    
    # Politics
    "Government announces new policy on environmental protection",
    "Election results show close race between candidates",
    "Lawmakers debate controversial bill in heated session",
    "International summit brings world leaders together",
    "Political party unveils campaign strategy for upcoming election",
    "Senate votes on infrastructure spending package",
    "Diplomatic negotiations continue amid international tensions",
    "President addresses nation on economic recovery plans",
]

labels = [
    'tech', 'tech', 'tech', 'tech', 'tech', 'tech', 'tech', 'tech',
    'sports', 'sports', 'sports', 'sports', 'sports', 'sports', 'sports', 'sports',
    'politics', 'politics', 'politics', 'politics', 'politics', 'politics', 'politics', 'politics'
]

# Preprocessing function
def preprocess_text(text):
    """
    Advanced text preprocessing.
    """
    # Lowercase
    text = text.lower()
    
    # Remove special characters but keep spaces
    text = re.sub(r'[^a-z\s]', '', text)
    
    # Remove extra whitespace
    text = ' '.join(text.split())
    
    return text

# Preprocess all texts
texts_clean = [preprocess_text(text) for text in texts]

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    texts_clean, labels, test_size=0.25, random_state=42, stratify=labels
)

print(f"\nDataset: {len(texts)} articles across 3 categories")
print(f"Training: {len(X_train)}, Testing: {len(X_test)}")

# Method 1: Bag of Words (CountVectorizer)
print("\n=== Method 1: Bag of Words ===")
bow_vectorizer = CountVectorizer(max_features=100, stop_words='english')
X_train_bow = bow_vectorizer.fit_transform(X_train)
X_test_bow = bow_vectorizer.transform(X_test)

bow_model = MultinomialNB()
bow_model.fit(X_train_bow, y_train)
bow_pred = bow_model.predict(X_test_bow)
bow_acc = accuracy_score(y_test, bow_pred)

print(f"Vocabulary size: {len(bow_vectorizer.get_feature_names_out())}")
print(f"Accuracy: {bow_acc:.4f}")

# Method 2: TF-IDF
print("\n=== Method 2: TF-IDF ===")
tfidf_vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_test_tfidf = tfidf_vectorizer.transform(X_test)

tfidf_model = LogisticRegression(max_iter=1000, random_state=42)
tfidf_model.fit(X_train_tfidf, y_train)
tfidf_pred = tfidf_model.predict(X_test_tfidf)
tfidf_acc = accuracy_score(y_test, tfidf_pred)

print(f"Accuracy: {tfidf_acc:.4f}")

# Comparison
print("\n=== Method Comparison ===")
print(f"Bag of Words:  {bow_acc:.4f}")
print(f"TF-IDF:        {tfidf_acc:.4f}")

# Test on new text
print("\n=== Testing on New Articles ===")
new_texts = [
    "Revolutionary neural network architecture improves computer vision",
    "National team qualifies for world championship tournament",
    "Congressional hearing addresses national security concerns"
]

for text in new_texts:
    clean_text = preprocess_text(text)
    vec = tfidf_vectorizer.transform([clean_text])
    pred = tfidf_model.predict(vec)[0]
    proba = tfidf_model.predict_proba(vec)[0]
    confidence = max(proba)
    print(f"\nText: '{text[:60]}...'")
    print(f"Predicted: {pred} (confidence: {confidence:.2%})")

print("\n✓ Text classification complete!")

Method Comparison#

Bag of Words (CountVectorizer):

  • Simple word frequency counts

  • Good for short texts

  • Ignores word importance

TF-IDF (Term Frequency-Inverse Document Frequency):

  • Weights words by importance

  • Down-weights common words

  • Better for longer documents

Word2Vec/Word Embeddings (not shown due to training time):

  • Captures semantic meaning

  • Understands word relationships

  • Requires large training corpus

For production systems, consider:

  • Use sklearn.datasets.fetch_20newsgroups() for real data

  • Add stemming/lemmatization with nltk or spacy

  • Try transformer models (BERT, RoBERTa) for SOTA results


06 - Computer Systems and Theory#

Exercise: Round Robin Process Scheduler#

Implement a Round Robin CPU scheduling algorithm.

from collections import deque
from dataclasses import dataclass
from typing import List

@dataclass
class Process:
    """Represents a process in the system."""
    pid: int  # Process ID
    burst_time: int  # Total CPU time needed
    arrival_time: int = 0  # When process arrives
    remaining_time: int = None  # Time left to complete
    start_time: int = None  # When first executed
    completion_time: int = None  # When completed
    
    def __post_init__(self):
        if self.remaining_time is None:
            self.remaining_time = self.burst_time
    
    def __repr__(self):
        return f"P{self.pid}(burst={self.burst_time}, remaining={self.remaining_time})"

class RoundRobinScheduler:
    """
    Round Robin CPU Scheduler.
    
    Each process gets a time quantum. If not finished, goes to back of queue.
    """
    def __init__(self, time_quantum: int = 2):
        self.time_quantum = time_quantum
        self.ready_queue = deque()
        self.current_time = 0
        self.execution_log = []
    
    def add_process(self, process: Process):
        """Add process to ready queue."""
        self.ready_queue.append(process)
    
    def schedule(self, processes: List[Process]):
        """
        Execute Round Robin scheduling.
        
        Time: O(n * burst_time), Space: O(n)
        """
        # Sort by arrival time and add to queue
        processes = sorted(processes, key=lambda p: p.arrival_time)
        
        # Add initially arrived processes
        process_idx = 0
        while process_idx < len(processes) and processes[process_idx].arrival_time <= self.current_time:
            self.ready_queue.append(processes[process_idx])
            process_idx += 1
        
        # Main scheduling loop
        while self.ready_queue or process_idx < len(processes):
            # Add newly arrived processes
            while process_idx < len(processes) and processes[process_idx].arrival_time <= self.current_time:
                self.ready_queue.append(processes[process_idx])
                process_idx += 1
            
            # If no process ready, advance time
            if not self.ready_queue:
                if process_idx < len(processes):
                    self.current_time = processes[process_idx].arrival_time
                continue
            
            # Get next process
            current_process = self.ready_queue.popleft()
            
            # Record start time
            if current_process.start_time is None:
                current_process.start_time = self.current_time
            
            # Execute for time quantum or remaining time
            execution_time = min(self.time_quantum, current_process.remaining_time)
            
            # Log execution
            self.execution_log.append({
                'time': self.current_time,
                'process': current_process.pid,
                'duration': execution_time
            })
            
            # Update process
            current_process.remaining_time -= execution_time
            self.current_time += execution_time
            
            # Add newly arrived processes before re-queuing
            while process_idx < len(processes) and processes[process_idx].arrival_time <= self.current_time:
                self.ready_queue.append(processes[process_idx])
                process_idx += 1
            
            # Check if process completed
            if current_process.remaining_time == 0:
                current_process.completion_time = self.current_time
            else:
                # Put back in queue
                self.ready_queue.append(current_process)
    
    def calculate_metrics(self, processes: List[Process]):
        """
        Calculate scheduling metrics.
        """
        metrics = []
        
        for p in processes:
            turnaround_time = p.completion_time - p.arrival_time
            waiting_time = turnaround_time - p.burst_time
            response_time = p.start_time - p.arrival_time
            
            metrics.append({
                'pid': p.pid,
                'arrival': p.arrival_time,
                'burst': p.burst_time,
                'completion': p.completion_time,
                'turnaround': turnaround_time,
                'waiting': waiting_time,
                'response': response_time
            })
        
        return metrics
    
    def print_gantt_chart(self):
        """Print Gantt chart of execution."""
        print("\nGantt Chart:")
        print("Time: ", end="")
        for entry in self.execution_log:
            print(f"{entry['time']:3d} ", end="")
        print(f"{self.current_time:3d}")
        
        print("Proc: ", end="")
        for entry in self.execution_log:
            print(f" P{entry['process']} ", end="")
        print()

# Test the scheduler
print("=== Round Robin Process Scheduler ===")
print("\nTime Quantum: 2")

# Create processes
processes = [
    Process(pid=1, burst_time=5, arrival_time=0),
    Process(pid=2, burst_time=3, arrival_time=1),
    Process(pid=3, burst_time=8, arrival_time=2),
    Process(pid=4, burst_time=6, arrival_time=3),
]

print("\nProcesses:")
for p in processes:
    print(f"  P{p.pid}: Burst Time = {p.burst_time}, Arrival = {p.arrival_time}")

# Run scheduler
scheduler = RoundRobinScheduler(time_quantum=2)
scheduler.schedule(processes)

# Print Gantt chart
scheduler.print_gantt_chart()

# Calculate and print metrics
metrics = scheduler.calculate_metrics(processes)

print("\n=== Scheduling Metrics ===")
print(f"{'PID':<5} {'Arrival':<8} {'Burst':<7} {'Complete':<10} {'Turnaround':<12} {'Waiting':<9} {'Response':<9}")
print("-" * 70)

total_turnaround = 0
total_waiting = 0
total_response = 0

for m in metrics:
    print(f"P{m['pid']:<4} {m['arrival']:<8} {m['burst']:<7} {m['completion']:<10} "
          f"{m['turnaround']:<12} {m['waiting']:<9} {m['response']:<9}")
    total_turnaround += m['turnaround']
    total_waiting += m['waiting']
    total_response += m['response']

n = len(metrics)
print("\n=== Averages ===")
print(f"Average Turnaround Time: {total_turnaround/n:.2f}")
print(f"Average Waiting Time: {total_waiting/n:.2f}")
print(f"Average Response Time: {total_response/n:.2f}")
print(f"\nCPU Utilization: 100% (no idle time)")
print(f"Throughput: {n/scheduler.current_time:.2f} processes/unit time")

print("\n✓ Round Robin scheduling complete!")

Scheduling Metrics Explained#

Turnaround Time: Total time from arrival to completion

  • Formula: Completion Time - Arrival Time

  • Lower is better

Waiting Time: Time spent waiting in ready queue

  • Formula: Turnaround Time - Burst Time

  • Lower is better

Response Time: Time from arrival to first execution

  • Formula: Start Time - Arrival Time

  • Lower is better

Round Robin Characteristics:

  • Fair: All processes get equal time

  • Good response time: Processes don’t wait long

  • Higher turnaround for long processes

  • Time quantum selection critical:

    • Too small: High context switch overhead

    • Too large: Degenerates to FCFS


08 - Classic Problems - Additional Challenges#

Solutions to the 5 challenge problems from the Classic Problems notebook.

Challenge 1: Four Sum (LeetCode #18)#

Extend Three Sum to find all unique quadruplets that sum to target.

def four_sum(nums: list[int], target: int) -> list[list[int]]:
    """
    Find all unique quadruplets that sum to target.
    
    Time: O(n³), Space: O(1) excluding output
    
    Strategy: Sort + fix two numbers + two pointers for remaining
    """
    nums.sort()
    n = len(nums)
    result = []
    
    # Fix first number
    for i in range(n - 3):
        # Skip duplicates for first number
        if i > 0 and nums[i] == nums[i-1]:
            continue
        
        # Fix second number
        for j in range(i + 1, n - 2):
            # Skip duplicates for second number
            if j > i + 1 and nums[j] == nums[j-1]:
                continue
            
            # Two pointers for remaining sum
            left = j + 1
            right = n - 1
            remaining = target - nums[i] - nums[j]
            
            while left < right:
                current_sum = nums[left] + nums[right]
                
                if current_sum == remaining:
                    result.append([nums[i], nums[j], nums[left], nums[right]])
                    
                    # Skip duplicates
                    while left < right and nums[left] == nums[left + 1]:
                        left += 1
                    while left < right and nums[right] == nums[right - 1]:
                        right -= 1
                    
                    left += 1
                    right -= 1
                elif current_sum < remaining:
                    left += 1
                else:
                    right -= 1
    
    return result

# Test
print("=== Challenge 1: Four Sum ===")
test_cases = [
    ([1, 0, -1, 0, -2, 2], 0),
    ([2, 2, 2, 2, 2], 8),
    ([1, 2, 3, 4, 5], 10),
]

for nums, target in test_cases:
    result = four_sum(nums, target)
    print(f"\nInput: {nums}, Target: {target}")
    print(f"Quadruplets: {result}")

print("\n✓ Four Sum complete!")

Challenge 2: A* Search Algorithm#

Implement A* pathfinding with heuristic function.

import heapq
from typing import Tuple, List, Set, Dict

def a_star_search(grid: List[List[int]], start: Tuple[int, int], 
                  goal: Tuple[int, int]) -> List[Tuple[int, int]]:
    """
    A* pathfinding algorithm.
    
    Time: O(b^d) where b is branching factor, d is depth
    Space: O(b^d)
    
    Args:
        grid: 2D grid where 0 = walkable, 1 = obstacle
        start: Starting position (row, col)
        goal: Goal position (row, col)
    
    Returns:
        List of positions from start to goal
    """
    rows, cols = len(grid), len(grid[0])
    
    def heuristic(pos: Tuple[int, int]) -> float:
        """Manhattan distance heuristic."""
        return abs(pos[0] - goal[0]) + abs(pos[1] - goal[1])
    
    def get_neighbors(pos: Tuple[int, int]) -> List[Tuple[int, int]]:
        """Get valid neighbors (4-directional)."""
        r, c = pos
        neighbors = []
        
        for dr, dc in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
            nr, nc = r + dr, c + dc
            if (0 <= nr < rows and 0 <= nc < cols and grid[nr][nc] == 0):
                neighbors.append((nr, nc))
        
        return neighbors
    
    # Priority queue: (f_score, position)
    # f_score = g_score + h_score
    open_set = [(heuristic(start), start)]
    
    # Track best path to each node
    came_from: Dict[Tuple, Tuple] = {}
    
    # g_score: actual cost from start
    g_score = {start: 0}
    
    # f_score: estimated total cost
    f_score = {start: heuristic(start)}
    
    # Track visited
    closed_set: Set[Tuple] = set()
    
    while open_set:
        _, current = heapq.heappop(open_set)
        
        # Skip if already visited
        if current in closed_set:
            continue
        
        # Goal reached!
        if current == goal:
            # Reconstruct path
            path = []
            while current in came_from:
                path.append(current)
                current = came_from[current]
            path.append(start)
            return list(reversed(path))
        
        closed_set.add(current)
        
        # Explore neighbors
        for neighbor in get_neighbors(current):
            if neighbor in closed_set:
                continue
            
            # Cost is 1 for each step (uniform cost)
            tentative_g = g_score[current] + 1
            
            # Found better path to neighbor
            if neighbor not in g_score or tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f_score[neighbor] = tentative_g + heuristic(neighbor)
                heapq.heappush(open_set, (f_score[neighbor], neighbor))
    
    return []  # No path found

# Test
print("=== Challenge 2: A* Search ===")

# Create grid (0 = walkable, 1 = obstacle)
grid = [
    [0, 0, 0, 0, 0],
    [0, 1, 1, 1, 0],
    [0, 0, 0, 1, 0],
    [0, 1, 0, 0, 0],
    [0, 0, 0, 0, 0],
]

start = (0, 0)
goal = (4, 4)

print("\nGrid (S=start, G=goal, X=obstacle):")
for i, row in enumerate(grid):
    for j, cell in enumerate(row):
        if (i, j) == start:
            print('S', end=' ')
        elif (i, j) == goal:
            print('G', end=' ')
        elif cell == 1:
            print('X', end=' ')
        else:
            print('.', end=' ')
    print()

path = a_star_search(grid, start, goal)

print(f"\nPath found: {path}")
print(f"Path length: {len(path)}")

# Visualize path
print("\nPath visualization:")
path_set = set(path)
for i, row in enumerate(grid):
    for j, cell in enumerate(row):
        if (i, j) == start:
            print('S', end=' ')
        elif (i, j) == goal:
            print('G', end=' ')
        elif (i, j) in path_set:
            print('*', end=' ')
        elif cell == 1:
            print('X', end=' ')
        else:
            print('.', end=' ')
    print()

print("\n✓ A* search complete!")

A* vs Dijkstra#

Dijkstra:

  • f(n) = g(n)

  • Explores all directions equally

  • Guaranteed optimal

A*:

  • f(n) = g(n) + h(n)

  • Uses heuristic to guide search toward goal

  • Much faster in practice

  • Optimal if heuristic is admissible (never overestimates)

Common Heuristics:

  • Manhattan distance (4-directional movement)

  • Euclidean distance (diagonal movement)

  • Chebyshev distance (8-directional)

Challenge 3: SHA-256 Implementation#

Implement SHA-256 hash function (simplified educational version).

Note: This is a simplified implementation for learning. Use hashlib in production!

# For demonstration, we'll show the concept and use hashlib
import hashlib
import struct

def simple_sha256_concept(message: str) -> str:
    """
    Conceptual SHA-256 (using hashlib for correct implementation).
    
    SHA-256 process:
    1. Pad message to 512-bit blocks
    2. Initialize hash values (8 constants)
    3. Process each block:
       - Create message schedule (64 words)
       - Compression function (64 rounds)
       - Update hash values
    4. Concatenate final hash
    """
    # In production, use hashlib:
    hash_object = hashlib.sha256(message.encode())
    return hash_object.hexdigest()

print("=== Challenge 3: SHA-256 ===")

# Test messages
messages = [
    "hello",
    "Hello",  # Capitalization changes everything
    "hello world",
    "The quick brown fox jumps over the lazy dog",
    "",  # Empty string
]

print("\nSHA-256 Hashes:")
for msg in messages:
    hash_value = simple_sha256_concept(msg)
    display = msg if msg else "(empty)"
    print(f"\n'{display}'")
    print(f"  {hash_value}")

# Demonstrate properties
print("\n=== SHA-256 Properties ===")

# 1. Deterministic
h1 = simple_sha256_concept("test")
h2 = simple_sha256_concept("test")
print(f"1. Deterministic: {h1 == h2}")

# 2. Small change → completely different hash
h3 = simple_sha256_concept("test")
h4 = simple_sha256_concept("Test")
print(f"2. Avalanche effect:")
print(f"   'test': {h3}")
print(f"   'Test': {h4}")
print(f"   Different: {h3 != h4}")

# 3. Fixed output size (256 bits = 64 hex characters)
h5 = simple_sha256_concept("a")
h6 = simple_sha256_concept("a" * 1000)
print(f"\n3. Fixed size: {len(h5)} hex chars (256 bits)")
print(f"   Short input:  {len(h5)} chars")
print(f"   Long input:   {len(h6)} chars")

# 4. One-way (computationally infeasible to reverse)
print(f"\n4. One-way: Cannot reverse hash to get original message")
print(f"   (Try factoring 2^256 possibilities!)")

print("\n✓ SHA-256 demonstration complete!")
print("\nNote: Full implementation requires:")
print("  - Message padding (multiple of 512 bits)")
print("  - 64 constant values (fractional parts of cube roots)")
print("  - Bitwise operations (AND, OR, XOR, ROT, SHR)")
print("  - 64 rounds of compression per block")
print("  - See FIPS 180-4 specification for details")

Challenge 4: Decision Tree from Scratch#

Implement a decision tree classifier with Gini impurity.

import numpy as np
from collections import Counter

class Node:
    """Decision tree node."""
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature  # Feature index to split on
        self.threshold = threshold  # Threshold value
        self.left = left  # Left subtree
        self.right = right  # Right subtree
        self.value = value  # Class value for leaf nodes
    
    def is_leaf(self):
        return self.value is not None

class DecisionTreeClassifier:
    """
    Decision Tree Classifier using Gini impurity.
    
    Time: O(n * m * log n) for training where n=samples, m=features
    Space: O(depth * nodes)
    """
    def __init__(self, max_depth=10, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None
    
    def gini_impurity(self, y):
        """
        Calculate Gini impurity.
        
        Gini = 1 - Σ(p_i²) where p_i is proportion of class i
        Lower is better (0 = pure, 0.5 = maximum impurity for binary)
        """
        counter = Counter(y)
        impurity = 1.0
        
        for count in counter.values():
            prob = count / len(y)
            impurity -= prob ** 2
        
        return impurity
    
    def split(self, X, y, feature, threshold):
        """Split dataset based on feature and threshold."""
        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask
        
        return (X[left_mask], y[left_mask], X[right_mask], y[right_mask])
    
    def information_gain(self, y, y_left, y_right):
        """
        Calculate information gain from split.
        
        Gain = Gini(parent) - weighted_average(Gini(children))
        """
        parent_gini = self.gini_impurity(y)
        
        n = len(y)
        n_left, n_right = len(y_left), len(y_right)
        
        if n_left == 0 or n_right == 0:
            return 0
        
        child_gini = (n_left / n) * self.gini_impurity(y_left) + \
                     (n_right / n) * self.gini_impurity(y_right)
        
        return parent_gini - child_gini
    
    def best_split(self, X, y):
        """Find best feature and threshold to split on."""
        best_gain = -1
        best_feature = None
        best_threshold = None
        
        n_features = X.shape[1]
        
        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            
            for threshold in thresholds:
                X_left, y_left, X_right, y_right = self.split(X, y, feature, threshold)
                
                gain = self.information_gain(y, y_left, y_right)
                
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold
        
        return best_feature, best_threshold
    
    def build_tree(self, X, y, depth=0):
        """Recursively build decision tree."""
        n_samples, n_features = X.shape
        n_classes = len(np.unique(y))
        
        # Stopping criteria
        if (depth >= self.max_depth or 
            n_samples < self.min_samples_split or 
            n_classes == 1):
            # Create leaf node with most common class
            leaf_value = Counter(y).most_common(1)[0][0]
            return Node(value=leaf_value)
        
        # Find best split
        best_feature, best_threshold = self.best_split(X, y)
        
        if best_feature is None:
            leaf_value = Counter(y).most_common(1)[0][0]
            return Node(value=leaf_value)
        
        # Split and recurse
        X_left, y_left, X_right, y_right = self.split(X, y, best_feature, best_threshold)
        
        left_subtree = self.build_tree(X_left, y_left, depth + 1)
        right_subtree = self.build_tree(X_right, y_right, depth + 1)
        
        return Node(feature=best_feature, threshold=best_threshold,
                   left=left_subtree, right=right_subtree)
    
    def fit(self, X, y):
        """Train the decision tree."""
        self.root = self.build_tree(X, y)
    
    def predict_sample(self, x, node):
        """Predict single sample."""
        if node.is_leaf():
            return node.value
        
        if x[node.feature] <= node.threshold:
            return self.predict_sample(x, node.left)
        else:
            return self.predict_sample(x, node.right)
    
    def predict(self, X):
        """Predict multiple samples."""
        return np.array([self.predict_sample(x, self.root) for x in X])

# Test
print("=== Challenge 4: Decision Tree ===")

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load iris dataset
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

# Train our decision tree
dt = DecisionTreeClassifier(max_depth=10, min_samples_split=2)
dt.fit(X_train, y_train)

# Predict
predictions = dt.predict(X_test)

# Evaluate
accuracy = accuracy_score(y_test, predictions)
print(f"\nDecision Tree Accuracy: {accuracy:.4f}")

print("\nSample predictions:")
for i in range(5):
    print(f"  Predicted: {predictions[i]}, Actual: {y_test[i]}")

print("\n✓ Decision tree complete!")

Challenge 5: Topological Sort (Kahn’s Algorithm)#

Implement topological sorting for dependency resolution.

from collections import deque, defaultdict
from typing import List, Dict

def topological_sort_kahns(graph: Dict[int, List[int]], num_nodes: int) -> List[int]:
    """
    Kahn's algorithm for topological sorting.
    
    Time: O(V + E), Space: O(V)
    
    Args:
        graph: Adjacency list {node: [dependent_nodes]}
        num_nodes: Total number of nodes
    
    Returns:
        Topologically sorted order, or empty list if cycle exists
    """
    # Calculate in-degree for each node
    in_degree = {i: 0 for i in range(num_nodes)}
    
    for node in graph:
        for neighbor in graph[node]:
            in_degree[neighbor] += 1
    
    # Queue of nodes with no dependencies
    queue = deque([node for node in in_degree if in_degree[node] == 0])
    
    result = []
    
    while queue:
        node = queue.popleft()
        result.append(node)
        
        # Remove edge from graph
        for neighbor in graph.get(node, []):
            in_degree[neighbor] -= 1
            
            # If no more dependencies, add to queue
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # Check for cycle
    if len(result) != num_nodes:
        return []  # Cycle detected
    
    return result

print("=== Challenge 5: Topological Sort ===")

# Example: Course prerequisites
# Nodes represent courses, edges represent "must take before"
print("\nExample: Course Prerequisites")
print("Courses: 0=Intro, 1=DataStruct, 2=Algorithms, 3=ML, 4=DeepLearning, 5=NLP")

# Graph: course -> courses that depend on it
prerequisites = {
    0: [1],        # Intro -> DataStruct
    1: [2],        # DataStruct -> Algorithms
    2: [3],        # Algorithms -> ML
    3: [4, 5],     # ML -> DeepLearning and NLP
    4: [],
    5: []
}

course_names = {
    0: "Introduction to CS",
    1: "Data Structures",
    2: "Algorithms",
    3: "Machine Learning",
    4: "Deep Learning",
    5: "NLP"
}

order = topological_sort_kahns(prerequisites, 6)

if order:
    print("\nValid course order:")
    for i, course in enumerate(order, 1):
        print(f"  {i}. {course_names[course]}")
else:
    print("\nNo valid order - cycle detected!")

# Example: Build system dependencies
print("\n\nExample: Build System Dependencies")
print("Modules: 0=utils, 1=config, 2=database, 3=api, 4=tests")

build_deps = {
    0: [1, 2],     # utils used by config and database
    1: [3],        # config used by api
    2: [3],        # database used by api
    3: [4],        # api used by tests
    4: []
}

module_names = {
    0: "utils",
    1: "config",
    2: "database",
    3: "api",
    4: "tests"
}

build_order = topological_sort_kahns(build_deps, 5)

print("\nBuild order:")
for i, module in enumerate(build_order, 1):
    print(f"  {i}. {module_names[module]}")

# Example: Cycle detection
print("\n\nExample: Detecting Cycles")
cyclic_graph = {
    0: [1],
    1: [2],
    2: [0],  # Cycle: 0 -> 1 -> 2 -> 0
}

result = topological_sort_kahns(cyclic_graph, 3)
if result:
    print(f"Order: {result}")
else:
    print("❌ Cycle detected! No valid topological order.")

print("\n✓ Topological sort complete!")

print("\n=== Applications ===")
print("- Task scheduling with dependencies")
print("- Build systems (make, npm, cargo)")
print("- Course prerequisites")
print("- Package managers (apt, pip)")
print("- Compilation order")
print("- Job scheduling in databases")

09 - CTF Challenges#

Important Note#

The CTF challenges notebook (09_ctf_challenges.ipynb) contains hands-on security exercises that are best completed interactively. The challenges cover:

  • Web: SQL injection, XSS

  • Binary: Buffer overflows, format strings

  • Reverse Engineering: Decompiling, crackmes

  • Cryptography: Breaking ciphers

  • Forensics: Steganography, file carving

  • OSINT: Metadata extraction

These challenges are designed to be worked through step-by-step with explanations in the original notebook. Rather than providing complete solutions here (which would defeat the learning purpose), we recommend:

  1. Read each challenge carefully

  2. Attempt it yourself first

  3. Use the hints provided

  4. Research the vulnerability/technique

  5. Consult CTF writeups if stuck

Additional CTF Resources#

Practice Platforms:

  • PicoCTF (beginner-friendly)

  • HackTheBox

  • TryHackMe

  • OverTheWire Wargames

Learning Resources:

  • CTFtime.org (calendar and writeups)

  • LiveOverflow YouTube channel

  • OWASP Top 10

Note: All techniques should only be used legally and ethically:

  • ✅ Authorized penetration testing

  • ✅ CTF competitions

  • ✅ Your own systems

  • ❌ Unauthorized access

  • ❌ Malicious use


Summary#

Congratulations on completing the Hard level solutions! You’ve implemented:

Algorithms & Data Structures#

  • Advanced decorators with memoization

  • Generator pipelines

  • Dijkstra’s shortest path

  • A* search with heuristics

  • Topological sort (Kahn’s algorithm)

  • Decision trees from scratch

Machine Learning & Deep Learning#

  • CNN for CIFAR-10 image classification

  • Text classification with multiple vectorization methods

  • Advanced preprocessing and feature engineering

Systems & Theory#

  • Round Robin CPU scheduler

  • Process scheduling metrics

  • Cryptography (SHA-256 concepts)

Problem-Solving Patterns#

  • Hash maps for optimization

  • Two pointers technique

  • Dynamic programming

  • Graph algorithms

  • Greedy algorithms

  • Divide and conquer

Next Steps#

  1. Practice regularly: Solve 1-2 problems daily on LeetCode/Codeforces

  2. Build projects: Apply these algorithms to real-world problems

  3. Read research papers: Stay current with SOTA techniques

  4. Participate in competitions: Kaggle, CTFs, coding contests

  5. Contribute to open source: Real-world experience

Mastery Checklist#

You’ve mastered Hard level when you can:

  • Solve LeetCode Hard problems in 45 minutes

  • Explain algorithm complexity intuitively

  • Implement ML algorithms from scratch

  • Design system architectures

  • Debug complex concurrent systems

  • Optimize for both time and space

  • Teach concepts to others clearly

Remember: These are advanced topics that take years to master. Be patient with yourself and enjoy the journey! 🚀