Published on

Reactive Programming in Flutter - From Basic to Advanced

Authors
  • avatar
    Name
    Phat Tran
    Twitter

Introduction

Reactive programming changes how we think about data flow. Instead of asking for data, we listen to data changes. Flutter uses this pattern everywhere. This guide takes you from basics to advanced patterns.

Part 1: Foundation - Understanding Reactive Programming

1. What is Reactive Programming?

Reactive programming is about data streams and change propagation. When data changes, all listeners update automatically.

Imperative vs Reactive Comparison:

┌─────────────────────────────────────────────────────────────┐
IMPERATIVE APPROACH├─────────────────────────────────────────────────────────────┤
1. User clicks button                                       │
2. Code calls fetchData()3. Code waits for response                                  │
4. Code manually updates UI5. User clicks again → repeat all steps                     │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
REACTIVE APPROACH├─────────────────────────────────────────────────────────────┤
1. UI subscribes to data stream (once)2. Data changes → UI updates automatically                  │
3. No manual refresh needed                                 │
4. Multiple listeners can react to same data                │
└─────────────────────────────────────────────────────────────┘

Benefits in Flutter:

  • Automatic UI updates when data changes
  • Clean separation between data and UI
  • Easy to handle async operations
  • Better code organization
  • Reduced boilerplate

2. Streams in Dart

Streams are the foundation of reactive programming in Dart.

Single Subscription vs Broadcast Streams:

// pubspec.yaml - No extra dependencies needed for basic streams

// single_vs_broadcast.dart

// Single Subscription Stream - Chỉ một listener được phép
void singleSubscriptionExample() {
  // Tạo stream chỉ cho phép một listener
  final stream = Stream.periodic(
    const Duration(seconds: 1),
    (count) => count,
  ).take(5);

  // Listener đầu tiên - OK
  stream.listen((data) {
    print('Listener 1: $data');
  });

  // Listener thứ hai - LỖI! Stream đã có listener
  // stream.listen((data) => print('Listener 2: $data')); // Bad: StateError
}

// Broadcast Stream - Nhiều listeners được phép
void broadcastExample() {
  // Tạo broadcast stream cho phép nhiều listeners
  final controller = StreamController<int>.broadcast();

  // Listener 1 - OK
  controller.stream.listen((data) {
    print('Listener 1: $data');
  });

  // Listener 2 - OK vì là broadcast
  controller.stream.listen((data) {
    print('Listener 2: $data');
  });

  // Phát dữ liệu - cả hai listeners đều nhận được
  controller.add(1);
  controller.add(2);
  controller.add(3);

  // Đừng quên đóng controller
  controller.close();
}

StreamController Usage:

// stream_controller_example.dart
import 'dart:async';

class CounterService {
  // Tạo StreamController để quản lý stream
  final _counterController = StreamController<int>.broadcast();

  // Biến lưu giá trị hiện tại
  int _count = 0;

  // Expose stream để widgets có thể listen
  Stream<int> get counterStream => _counterController.stream;

  // Lấy giá trị hiện tại
  int get currentCount => _count;

  // Tăng counter và phát giá trị mới
  void increment() {
    _count++;
    _counterController.add(_count); // Phát giá trị mới qua stream
  }

  // Giảm counter
  void decrement() {
    _count--;
    _counterController.add(_count);
  }

  // Reset về 0
  void reset() {
    _count = 0;
    _counterController.add(_count);
  }

  // QUAN TRỌNG: Đóng controller khi không cần nữa
  void dispose() {
    _counterController.close();
  }
}

Stream Methods - listen, where, map, transform:

// stream_methods.dart
import 'dart:async';

void streamMethodsExample() {
  final controller = StreamController<int>();

  // map - Biến đổi mỗi giá trị
  final doubledStream = controller.stream.map((value) => value * 2);

  // where - Lọc giá trị theo điều kiện
  final evenStream = controller.stream
      .asBroadcastStream()
      .where((value) => value % 2 == 0);

  // transform - Biến đổi phức tạp hơn
  final transformer = StreamTransformer<int, String>.fromHandlers(
    handleData: (data, sink) {
      // Chuyển số thành text mô tả
      if (data > 10) {
        sink.add('Large: $data');
      } else {
        sink.add('Small: $data');
      }
    },
  );

  final transformedStream = controller.stream
      .asBroadcastStream()
      .transform(transformer);

  // Listen với đầy đủ callbacks
  controller.stream.asBroadcastStream().listen(
    (data) {
      print('Data: $data'); // Xử lý dữ liệu
    },
    onError: (error) {
      print('Error: $error'); // Xử lý lỗi
    },
    onDone: () {
      print('Stream closed'); // Stream đã đóng
    },
    cancelOnError: false, // Tiếp tục listen sau lỗi
  );

  // Phát dữ liệu test
  controller.add(5);
  controller.add(10);
  controller.add(15);
  controller.close();
}

Error Handling:

// error_handling.dart
import 'dart:async';

class DataService {
  final _controller = StreamController<String>.broadcast();

  Stream<String> get dataStream => _controller.stream;

  Future<void> fetchData() async {
    try {
      // Giả lập API call
      await Future.delayed(const Duration(seconds: 1));

      // Giả lập lỗi ngẫu nhiên
      if (DateTime.now().second % 2 == 0) {
        throw Exception('Network error');
      }

      _controller.add('Data loaded successfully');
    } catch (e) {
      // Phát lỗi qua stream
      _controller.addError(e);
    }
  }

  void dispose() {
    _controller.close();
  }
}

// Sử dụng với error handling
void useDataService() {
  final service = DataService();

  service.dataStream.listen(
    (data) => print('Success: $data'),
    onError: (error) {
      // Xử lý lỗi tại đây
      print('Error occurred: $error');
      // Có thể retry, show snackbar, etc.
    },
  );

  service.fetchData();
}

3. StreamBuilder Widget

StreamBuilder connects streams to Flutter widgets.

// stream_builder_example.dart
import 'package:flutter/material.dart';
import 'dart:async';

class CounterPage extends StatefulWidget {
  const CounterPage({super.key});

  
  State<CounterPage> createState() => _CounterPageState();
}

class _CounterPageState extends State<CounterPage> {
  // Controller để quản lý stream
  final _controller = StreamController<int>.broadcast();
  int _count = 0;

  
  void dispose() {
    // QUAN TRỌNG: Đóng controller để tránh memory leak
    _controller.close();
    super.dispose();
  }

  void _increment() {
    _count++;
    _controller.add(_count);
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('Stream Counter')),
      body: Center(
        child: StreamBuilder<int>(
          stream: _controller.stream,
          initialData: 0, // Giá trị ban đầu trước khi có data
          builder: (context, snapshot) {
            // Xử lý các trạng thái của connection
            switch (snapshot.connectionState) {
              case ConnectionState.none:
                // Không có stream
                return const Text('No stream');

              case ConnectionState.waiting:
                // Đang chờ data đầu tiên
                return const CircularProgressIndicator();

              case ConnectionState.active:
                // Stream đang active và có data
                if (snapshot.hasError) {
                  return Text('Error: ${snapshot.error}');
                }
                return Text(
                  'Count: ${snapshot.data}',
                  style: const TextStyle(fontSize: 48),
                );

              case ConnectionState.done:
                // Stream đã đóng
                return const Text('Stream closed');
            }
          },
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _increment,
        child: const Icon(Icons.add),
      ),
    );
  }
}

Real-time Chat Message Example:

// chat_stream_example.dart
import 'package:flutter/material.dart';
import 'dart:async';

// Model cho message
class ChatMessage {
  final String id;
  final String text;
  final String sender;
  final DateTime timestamp;

  ChatMessage({
    required this.id,
    required this.text,
    required this.sender,
    required this.timestamp,
  });
}

// Service quản lý chat messages
class ChatService {
  final _messagesController = StreamController<List<ChatMessage>>.broadcast();
  final List<ChatMessage> _messages = [];

  Stream<List<ChatMessage>> get messagesStream => _messagesController.stream;

  void addMessage(String text, String sender) {
    final message = ChatMessage(
      id: DateTime.now().millisecondsSinceEpoch.toString(),
      text: text,
      sender: sender,
      timestamp: DateTime.now(),
    );
    _messages.add(message);
    // Phát danh sách messages mới
    _messagesController.add(List.from(_messages));
  }

  void dispose() {
    _messagesController.close();
  }
}

// Widget hiển thị chat
class ChatWidget extends StatefulWidget {
  const ChatWidget({super.key});

  
  State<ChatWidget> createState() => _ChatWidgetState();
}

class _ChatWidgetState extends State<ChatWidget> {
  final _chatService = ChatService();
  final _textController = TextEditingController();

  
  void dispose() {
    _chatService.dispose();
    _textController.dispose();
    super.dispose();
  }

  void _sendMessage() {
    if (_textController.text.isNotEmpty) {
      _chatService.addMessage(_textController.text, 'User');
      _textController.clear();
    }
  }

  
  Widget build(BuildContext context) {
    return Column(
      children: [
        // Danh sách messages với StreamBuilder
        Expanded(
          child: StreamBuilder<List<ChatMessage>>(
            stream: _chatService.messagesStream,
            initialData: const [],
            builder: (context, snapshot) {
              final messages = snapshot.data ?? [];

              if (messages.isEmpty) {
                return const Center(
                  child: Text('No messages yet'),
                );
              }

              return ListView.builder(
                itemCount: messages.length,
                itemBuilder: (context, index) {
                  final message = messages[index];
                  return ListTile(
                    title: Text(message.text),
                    subtitle: Text(message.sender),
                    trailing: Text(
                      '${message.timestamp.hour}:${message.timestamp.minute}',
                    ),
                  );
                },
              );
            },
          ),
        ),
        // Input field
        Padding(
          padding: const EdgeInsets.all(8.0),
          child: Row(
            children: [
              Expanded(
                child: TextField(
                  controller: _textController,
                  decoration: const InputDecoration(
                    hintText: 'Type a message...',
                  ),
                ),
              ),
              IconButton(
                onPressed: _sendMessage,
                icon: const Icon(Icons.send),
              ),
            ],
          ),
        ),
      ],
    );
  }
}

Part 2: Reactive State Management

4. ValueNotifier & ValueListenableBuilder

ValueNotifier is simpler than streams. Use it for single values.

// value_notifier_example.dart
import 'package:flutter/material.dart';

// Khi nào dùng ValueNotifier:
// - Quản lý single value đơn giản
// - Không cần transform data
// - Performance tốt hơn streams cho simple cases

class ThemeSwitcher extends StatefulWidget {
  const ThemeSwitcher({super.key});

  
  State<ThemeSwitcher> createState() => _ThemeSwitcherState();
}

class _ThemeSwitcherState extends State<ThemeSwitcher> {
  // ValueNotifier cho theme mode
  final _isDarkMode = ValueNotifier<bool>(false);

  // ValueNotifier cho form validation
  final _isFormValid = ValueNotifier<bool>(false);
  final _emailController = TextEditingController();

  
  void initState() {
    super.initState();
    // Listen thay đổi của email để validate
    _emailController.addListener(_validateForm);
  }

  void _validateForm() {
    final email = _emailController.text;
    // Validate email format
    final isValid = email.contains('@') && email.contains('.');
    _isFormValid.value = isValid; // Tự động notify listeners
  }

  
  void dispose() {
    _isDarkMode.dispose();
    _isFormValid.dispose();
    _emailController.dispose();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Column(
      children: [
        // Theme switcher với ValueListenableBuilder
        ValueListenableBuilder<bool>(
          valueListenable: _isDarkMode,
          builder: (context, isDark, child) {
            return SwitchListTile(
              title: Text(isDark ? 'Dark Mode' : 'Light Mode'),
              value: isDark,
              onChanged: (value) {
                _isDarkMode.value = value; // Cập nhật giá trị
              },
            );
          },
        ),

        // Form với validation
        Padding(
          padding: const EdgeInsets.all(16.0),
          child: TextField(
            controller: _emailController,
            decoration: const InputDecoration(
              labelText: 'Email',
              hintText: 'Enter your email',
            ),
          ),
        ),

        // Submit button - enabled khi form valid
        ValueListenableBuilder<bool>(
          valueListenable: _isFormValid,
          builder: (context, isValid, child) {
            return ElevatedButton(
              // Disable button nếu form không valid
              onPressed: isValid ? () => print('Submit!') : null,
              child: const Text('Submit'),
            );
          },
        ),
      ],
    );
  }
}

Comparison: ValueNotifier vs StreamBuilder:

┌────────────────────────┬────────────────────────┐
ValueNotifierStreamBuilder├────────────────────────┼────────────────────────┤
Single value only      │ Multiple values        │
Sync updates           │ Async support          │
No transformations     │ map, where, etc.       
Simpler APIMore powerful          │
Less memory            │ More overhead          │
Good for UI state      │ Good for data flow     │
└────────────────────────┴────────────────────────┘

5. ChangeNotifier & Provider

Provider is the recommended state management for most Flutter apps.

# pubspec.yaml
dependencies:
  flutter:
    sdk: flutter
  provider: ^6.1.1
// shopping_cart_provider.dart
import 'package:flutter/material.dart';
import 'package:provider/provider.dart';

// Model cho product
class Product {
  final String id;
  final String name;
  final double price;

  Product({required this.id, required this.name, required this.price});
}

// ChangeNotifier cho shopping cart
class CartProvider extends ChangeNotifier {
  final List<Product> _items = [];

  // Getter cho items
  List<Product> get items => List.unmodifiable(_items);

  // Tổng số items
  int get itemCount => _items.length;

  // Tổng tiền
  double get totalPrice {
    return _items.fold(0, (sum, item) => sum + item.price);
  }

  // Thêm product vào cart
  void addItem(Product product) {
    _items.add(product);
    notifyListeners(); // Thông báo cho tất cả listeners
  }

  // Xóa product khỏi cart
  void removeItem(String productId) {
    _items.removeWhere((item) => item.id == productId);
    notifyListeners();
  }

  // Xóa tất cả
  void clearCart() {
    _items.clear();
    notifyListeners();
  }
}

// User authentication provider
class AuthProvider extends ChangeNotifier {
  String? _userId;
  String? _email;
  bool _isLoading = false;

  bool get isLoggedIn => _userId != null;
  String? get userId => _userId;
  String? get email => _email;
  bool get isLoading => _isLoading;

  Future<void> login(String email, String password) async {
    _isLoading = true;
    notifyListeners();

    try {
      // Giả lập API call
      await Future.delayed(const Duration(seconds: 2));

      // Giả lập thành công
      _userId = 'user_123';
      _email = email;
    } catch (e) {
      // Handle error
      rethrow;
    } finally {
      _isLoading = false;
      notifyListeners();
    }
  }

  void logout() {
    _userId = null;
    _email = null;
    notifyListeners();
  }
}

// App với MultiProvider
class MyApp extends StatelessWidget {
  const MyApp({super.key});

  
  Widget build(BuildContext context) {
    return MultiProvider(
      providers: [
        // Đăng ký các providers
        ChangeNotifierProvider(create: (_) => CartProvider()),
        ChangeNotifierProvider(create: (_) => AuthProvider()),
      ],
      child: MaterialApp(
        home: const ShoppingPage(),
      ),
    );
  }
}

// Shopping page sử dụng providers
class ShoppingPage extends StatelessWidget {
  const ShoppingPage({super.key});

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text('Shop'),
        actions: [
          // Cart icon với badge
          Consumer<CartProvider>(
            builder: (context, cart, child) {
              return Badge(
                label: Text('${cart.itemCount}'),
                child: IconButton(
                  icon: const Icon(Icons.shopping_cart),
                  onPressed: () {
                    // Navigate to cart
                  },
                ),
              );
            },
          ),
        ],
      ),
      body: Column(
        children: [
          // User info
          Consumer<AuthProvider>(
            builder: (context, auth, child) {
              if (auth.isLoading) {
                return const CircularProgressIndicator();
              }
              if (auth.isLoggedIn) {
                return Text('Welcome, ${auth.email}');
              }
              return ElevatedButton(
                onPressed: () {
                  auth.login('test@email.com', 'password');
                },
                child: const Text('Login'),
              );
            },
          ),

          // Product list
          Expanded(
            child: ListView(
              children: [
                _ProductItem(
                  product: Product(id: '1', name: 'Item 1', price: 10.0),
                ),
                _ProductItem(
                  product: Product(id: '2', name: 'Item 2', price: 20.0),
                ),
              ],
            ),
          ),

          // Total price - chỉ rebuild khi cart thay đổi
          Consumer<CartProvider>(
            builder: (context, cart, child) {
              return Padding(
                padding: const EdgeInsets.all(16.0),
                child: Text(
                  'Total: \$${cart.totalPrice.toStringAsFixed(2)}',
                  style: const TextStyle(fontSize: 24),
                ),
              );
            },
          ),
        ],
      ),
    );
  }
}

class _ProductItem extends StatelessWidget {
  final Product product;

  const _ProductItem({required this.product});

  
  Widget build(BuildContext context) {
    return ListTile(
      title: Text(product.name),
      subtitle: Text('\$${product.price}'),
      trailing: IconButton(
        icon: const Icon(Icons.add_shopping_cart),
        onPressed: () {
          // Sử dụng context.read để không rebuild khi cart thay đổi
          context.read<CartProvider>().addItem(product);
        },
      ),
    );
  }
}

6. Riverpod - Modern Reactive Approach

Riverpod solves many Provider limitations.

# pubspec.yaml
dependencies:
  flutter:
    sdk: flutter
  flutter_riverpod: ^2.4.9
  dio: ^5.4.0  # For API calls
// riverpod_example.dart
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:dio/dio.dart';

// Model
class User {
  final int id;
  final String name;
  final String email;

  User({required this.id, required this.name, required this.email});

  factory User.fromJson(Map<String, dynamic> json) {
    return User(
      id: json['id'],
      name: json['name'],
      email: json['email'],
    );
  }
}

// StateProvider - Cho simple state
final counterProvider = StateProvider<int>((ref) => 0);

// StateNotifierProvider - Cho complex state
class TodoNotifier extends StateNotifier<List<String>> {
  TodoNotifier() : super([]);

  void add(String todo) {
    state = [...state, todo];
  }

  void remove(int index) {
    state = [
      for (int i = 0; i < state.length; i++)
        if (i != index) state[i],
    ];
  }
}

final todoProvider = StateNotifierProvider<TodoNotifier, List<String>>((ref) {
  return TodoNotifier();
});

// FutureProvider - Cho async data
final userProvider = FutureProvider.autoDispose<User>((ref) async {
  final dio = Dio();
  final response = await dio.get('https://jsonplaceholder.typicode.com/users/1');
  return User.fromJson(response.data);
});

// FutureProvider.family - Với parameter
final userByIdProvider = FutureProvider.autoDispose.family<User, int>((ref, userId) async {
  final dio = Dio();
  final response = await dio.get('https://jsonplaceholder.typicode.com/users/$userId');
  return User.fromJson(response.data);
});

// StreamProvider - Cho real-time data
final timerProvider = StreamProvider.autoDispose<int>((ref) {
  return Stream.periodic(
    const Duration(seconds: 1),
    (count) => count,
  );
});

// App
class RiverpodApp extends StatelessWidget {
  const RiverpodApp({super.key});

  
  Widget build(BuildContext context) {
    return ProviderScope(
      child: MaterialApp(
        home: const RiverpodPage(),
      ),
    );
  }
}

class RiverpodPage extends ConsumerWidget {
  const RiverpodPage({super.key});

  
  Widget build(BuildContext context, WidgetRef ref) {
    // Watch counter
    final counter = ref.watch(counterProvider);

    // Watch user - AsyncValue handles loading/error/data
    final userAsync = ref.watch(userProvider);

    // Watch timer stream
    final timerAsync = ref.watch(timerProvider);

    return Scaffold(
      appBar: AppBar(title: const Text('Riverpod Demo')),
      body: ListView(
        padding: const EdgeInsets.all(16),
        children: [
          // Counter section
          Text('Counter: $counter', style: const TextStyle(fontSize: 24)),
          Row(
            children: [
              ElevatedButton(
                onPressed: () => ref.read(counterProvider.notifier).state++,
                child: const Text('+'),
              ),
              const SizedBox(width: 8),
              ElevatedButton(
                onPressed: () => ref.read(counterProvider.notifier).state--,
                child: const Text('-'),
              ),
            ],
          ),

          const Divider(height: 32),

          // User section với AsyncValue pattern
          userAsync.when(
            loading: () => const CircularProgressIndicator(),
            error: (error, stack) => Text('Error: $error'),
            data: (user) => Column(
              crossAxisAlignment: CrossAxisAlignment.start,
              children: [
                Text('Name: ${user.name}'),
                Text('Email: ${user.email}'),
              ],
            ),
          ),

          const Divider(height: 32),

          // Timer section
          timerAsync.when(
            loading: () => const Text('Starting timer...'),
            error: (e, s) => Text('Error: $e'),
            data: (seconds) => Text('Timer: $seconds seconds'),
          ),

          const Divider(height: 32),

          // Todo section
          const _TodoSection(),
        ],
      ),
    );
  }
}

class _TodoSection extends ConsumerWidget {
  const _TodoSection();

  
  Widget build(BuildContext context, WidgetRef ref) {
    final todos = ref.watch(todoProvider);

    return Column(
      crossAxisAlignment: CrossAxisAlignment.start,
      children: [
        const Text('Todos:', style: TextStyle(fontSize: 20)),
        ElevatedButton(
          onPressed: () {
            ref.read(todoProvider.notifier).add('Todo ${todos.length + 1}');
          },
          child: const Text('Add Todo'),
        ),
        ...todos.asMap().entries.map((entry) {
          return ListTile(
            title: Text(entry.value),
            trailing: IconButton(
              icon: const Icon(Icons.delete),
              onPressed: () {
                ref.read(todoProvider.notifier).remove(entry.key);
              },
            ),
          );
        }),
      ],
    );
  }
}

Part 3: RxDart - Reactive Extensions

7. Introduction to RxDart

RxDart adds powerful operators to Dart streams.

# pubspec.yaml
dependencies:
  flutter:
    sdk: flutter
  rxdart: ^0.27.7
// rxdart_basics.dart
import 'package:rxdart/rxdart.dart';

void subjectsExample() {
  // PublishSubject - Chỉ phát events sau khi subscribe
  final publishSubject = PublishSubject<int>();
  publishSubject.add(1); // Không ai nhận được
  publishSubject.listen((value) => print('Publish: $value'));
  publishSubject.add(2); // Listener nhận được

  // BehaviorSubject - Phát giá trị cuối cùng cho subscriber mới
  final behaviorSubject = BehaviorSubject<int>.seeded(0);
  behaviorSubject.add(1);
  behaviorSubject.add(2);
  behaviorSubject.listen((value) => print('Behavior: $value')); // Nhận được 2

  // ReplaySubject - Phát lại N giá trị gần nhất
  final replaySubject = ReplaySubject<int>(maxSize: 2);
  replaySubject.add(1);
  replaySubject.add(2);
  replaySubject.add(3);
  replaySubject.listen((value) => print('Replay: $value')); // Nhận 2, 3

  // Đóng subjects
  publishSubject.close();
  behaviorSubject.close();
  replaySubject.close();
}

8. Advanced Operators

// rxdart_operators.dart
import 'package:rxdart/rxdart.dart';
import 'dart:async';

void transformationOperators() {
  // switchMap - Hủy stream cũ khi có event mới
  // Use case: Search API - hủy request cũ khi user tiếp tục gõ
  final searchController = BehaviorSubject<String>();

  searchController
      .switchMap((query) {
        // Mỗi lần có query mới, stream cũ bị hủy
        return Stream.fromFuture(searchApi(query));
      })
      .listen((results) => print('Results: $results'));

  // flatMap - Không hủy stream cũ, chạy song song
  // Use case: Multiple file uploads
  final uploadSubject = PublishSubject<String>();

  uploadSubject
      .flatMap((file) => Stream.fromFuture(uploadFile(file)))
      .listen((result) => print('Uploaded: $result'));

  // concatMap - Đợi stream cũ hoàn thành mới chạy stream mới
  // Use case: Sequential API calls
  final taskSubject = PublishSubject<int>();

  taskSubject
      .concatMap((taskId) => Stream.fromFuture(processTask(taskId)))
      .listen((result) => print('Task done: $result'));
}

void filteringOperators() {
  final inputController = BehaviorSubject<String>();

  // debounceTime - Đợi user ngừng gõ
  inputController
      .debounceTime(const Duration(milliseconds: 300))
      .listen((value) => print('Debounced: $value'));

  // distinctUntilChanged - Bỏ qua giá trị giống nhau liên tiếp
  inputController
      .distinctUntilChanged()
      .listen((value) => print('Distinct: $value'));

  // throttleTime - Lấy giá trị đầu tiên trong khoảng thời gian
  inputController
      .throttleTime(const Duration(seconds: 1))
      .listen((value) => print('Throttled: $value'));

  // Test
  inputController.add('a');
  inputController.add('a'); // distinctUntilChanged bỏ qua
  inputController.add('ab');
  inputController.add('abc');
}

void combinationOperators() {
  final stream1 = BehaviorSubject<String>.seeded('Hello');
  final stream2 = BehaviorSubject<String>.seeded('World');
  final stream3 = Stream.periodic(const Duration(seconds: 1), (i) => i).take(3);

  // combineLatest - Kết hợp giá trị mới nhất từ mỗi stream
  Rx.combineLatest2(stream1, stream2, (a, b) => '$a $b')
      .listen((value) => print('Combined: $value'));

  // merge - Gộp nhiều streams thành một
  Rx.merge([stream1, stream2])
      .listen((value) => print('Merged: $value'));

  // zip - Ghép từng cặp giá trị theo thứ tự
  Rx.zip2(stream1, stream2, (a, b) => '$a-$b')
      .listen((value) => print('Zipped: $value'));
}

void utilityOperators() {
  final subject = PublishSubject<int>();

  // delay - Trì hoãn mỗi event
  subject
      .delay(const Duration(seconds: 1))
      .listen((value) => print('Delayed: $value'));

  // retry - Thử lại khi có lỗi
  Stream.fromFuture(fetchDataMayFail())
      .doOnError((e, s) => print('Error, retrying...'))
      .retry(3)
      .listen((data) => print('Data: $data'));

  // timeout - Lỗi nếu quá thời gian
  subject
      .timeout(const Duration(seconds: 5))
      .listen(
        (value) => print('Value: $value'),
        onError: (e) => print('Timeout!'),
      );
}

// Helper functions
Future<List<String>> searchApi(String query) async {
  await Future.delayed(const Duration(milliseconds: 500));
  return ['Result for: $query'];
}

Future<String> uploadFile(String file) async {
  await Future.delayed(const Duration(seconds: 1));
  return 'Uploaded: $file';
}

Future<String> processTask(int taskId) async {
  await Future.delayed(const Duration(milliseconds: 500));
  return 'Task $taskId completed';
}

Future<String> fetchDataMayFail() async {
  if (DateTime.now().millisecond % 2 == 0) {
    throw Exception('Random failure');
  }
  return 'Success';
}

9. Reactive Form Handling

// reactive_form.dart
import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';

class ReactiveFormPage extends StatefulWidget {
  const ReactiveFormPage({super.key});

  
  State<ReactiveFormPage> createState() => _ReactiveFormPageState();
}

class _ReactiveFormPageState extends State<ReactiveFormPage> {
  // Subjects cho mỗi field
  final _emailSubject = BehaviorSubject<String>.seeded('');
  final _passwordSubject = BehaviorSubject<String>.seeded('');
  final _confirmPasswordSubject = BehaviorSubject<String>.seeded('');

  // Stream validation cho email
  Stream<String?> get emailError => _emailSubject.stream
      .debounceTime(const Duration(milliseconds: 300))
      .map((email) {
        if (email.isEmpty) return 'Email is required';
        if (!email.contains('@')) return 'Invalid email format';
        if (!email.contains('.')) return 'Invalid email format';
        return null; // No error
      });

  // Stream validation cho password
  Stream<String?> get passwordError => _passwordSubject.stream
      .debounceTime(const Duration(milliseconds: 300))
      .map((password) {
        if (password.isEmpty) return 'Password is required';
        if (password.length < 8) return 'Password must be at least 8 characters';
        if (!password.contains(RegExp(r'[A-Z]'))) {
          return 'Password must contain uppercase letter';
        }
        if (!password.contains(RegExp(r'[0-9]'))) {
          return 'Password must contain number';
        }
        return null;
      });

  // Cross-field validation - confirm password
  Stream<String?> get confirmPasswordError => Rx.combineLatest2(
        _passwordSubject.stream,
        _confirmPasswordSubject.stream,
        (password, confirm) {
          if (confirm.isEmpty) return 'Please confirm password';
          if (password != confirm) return 'Passwords do not match';
          return null;
        },
      ).debounceTime(const Duration(milliseconds: 300));

  // Combined validation - form is valid when all fields valid
  Stream<bool> get isFormValid => Rx.combineLatest3(
        emailError,
        passwordError,
        confirmPasswordError,
        (emailErr, passErr, confirmErr) {
          return emailErr == null && passErr == null && confirmErr == null;
        },
      );

  // Auto-save draft
  late final Stream<void> _autoSave;

  
  void initState() {
    super.initState();
    // Auto save every 5 seconds if form changed
    _autoSave = Rx.combineLatest3(
      _emailSubject,
      _passwordSubject,
      _confirmPasswordSubject,
      (e, p, c) => {'email': e, 'password': p},
    )
        .debounceTime(const Duration(seconds: 5))
        .doOnData((data) {
          print('Auto-saving draft: $data');
          // Save to local storage
        });

    _autoSave.listen((_) {});
  }

  
  void dispose() {
    _emailSubject.close();
    _passwordSubject.close();
    _confirmPasswordSubject.close();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('Reactive Form')),
      body: Padding(
        padding: const EdgeInsets.all(16.0),
        child: Column(
          children: [
            // Email field
            StreamBuilder<String?>(
              stream: emailError,
              builder: (context, snapshot) {
                return TextField(
                  onChanged: _emailSubject.add,
                  decoration: InputDecoration(
                    labelText: 'Email',
                    errorText: snapshot.data,
                  ),
                );
              },
            ),
            const SizedBox(height: 16),

            // Password field
            StreamBuilder<String?>(
              stream: passwordError,
              builder: (context, snapshot) {
                return TextField(
                  onChanged: _passwordSubject.add,
                  obscureText: true,
                  decoration: InputDecoration(
                    labelText: 'Password',
                    errorText: snapshot.data,
                  ),
                );
              },
            ),
            const SizedBox(height: 16),

            // Confirm password field
            StreamBuilder<String?>(
              stream: confirmPasswordError,
              builder: (context, snapshot) {
                return TextField(
                  onChanged: _confirmPasswordSubject.add,
                  obscureText: true,
                  decoration: InputDecoration(
                    labelText: 'Confirm Password',
                    errorText: snapshot.data,
                  ),
                );
              },
            ),
            const SizedBox(height: 24),

            // Submit button
            StreamBuilder<bool>(
              stream: isFormValid,
              initialData: false,
              builder: (context, snapshot) {
                final isValid = snapshot.data ?? false;
                return ElevatedButton(
                  onPressed: isValid ? _submit : null,
                  child: const Text('Register'),
                );
              },
            ),
          ],
        ),
      ),
    );
  }

  void _submit() {
    print('Form submitted!');
    print('Email: ${_emailSubject.value}');
  }
}

Part 4: Firestore Reactive Patterns

10. Firestore Streams & Real-time Updates

# pubspec.yaml
dependencies:
  flutter:
    sdk: flutter
  cloud_firestore: ^4.14.0
  firebase_core: ^2.24.2
  rxdart: ^0.27.7
// firestore_reactive.dart
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:rxdart/rxdart.dart';

// Model
class Todo {
  final String id;
  final String title;
  final bool completed;
  final DateTime createdAt;

  Todo({
    required this.id,
    required this.title,
    required this.completed,
    required this.createdAt,
  });

  factory Todo.fromFirestore(DocumentSnapshot doc) {
    final data = doc.data() as Map<String, dynamic>;
    return Todo(
      id: doc.id,
      title: data['title'] ?? '',
      completed: data['completed'] ?? false,
      createdAt: (data['createdAt'] as Timestamp).toDate(),
    );
  }

  Map<String, dynamic> toFirestore() {
    return {
      'title': title,
      'completed': completed,
      'createdAt': Timestamp.fromDate(createdAt),
    };
  }
}

class TodoRepository {
  final FirebaseFirestore _firestore = FirebaseFirestore.instance;

  CollectionReference get _todosCollection => _firestore.collection('todos');

  // Real-time stream - Tự động cập nhật khi data thay đổi
  Stream<List<Todo>> watchTodos() {
    return _todosCollection
        .orderBy('createdAt', descending: true)
        .snapshots() // Real-time updates
        .map((snapshot) {
          return snapshot.docs.map((doc) => Todo.fromFirestore(doc)).toList();
        });
  }

  // One-time fetch - Chỉ lấy một lần
  Future<List<Todo>> getTodos() async {
    final snapshot = await _todosCollection
        .orderBy('createdAt', descending: true)
        .get();
    return snapshot.docs.map((doc) => Todo.fromFirestore(doc)).toList();
  }

  // Watch single document
  Stream<Todo?> watchTodo(String id) {
    return _todosCollection.doc(id).snapshots().map((doc) {
      if (!doc.exists) return null;
      return Todo.fromFirestore(doc);
    });
  }

  // Query với stream
  Stream<List<Todo>> watchCompletedTodos(bool completed) {
    return _todosCollection
        .where('completed', isEqualTo: completed)
        .snapshots()
        .map((snapshot) {
          return snapshot.docs.map((doc) => Todo.fromFirestore(doc)).toList();
        });
  }

  // Add todo
  Future<void> addTodo(String title) {
    return _todosCollection.add({
      'title': title,
      'completed': false,
      'createdAt': FieldValue.serverTimestamp(),
    });
  }

  // Update todo
  Future<void> updateTodo(String id, {String? title, bool? completed}) {
    final updates = <String, dynamic>{};
    if (title != null) updates['title'] = title;
    if (completed != null) updates['completed'] = completed;
    return _todosCollection.doc(id).update(updates);
  }

  // Delete todo
  Future<void> deleteTodo(String id) {
    return _todosCollection.doc(id).delete();
  }
}

11. Complex Stream Transformations

// complex_streams.dart
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:rxdart/rxdart.dart';

class User {
  final String id;
  final String name;

  User({required this.id, required this.name});

  factory User.fromFirestore(DocumentSnapshot doc) {
    final data = doc.data() as Map<String, dynamic>;
    return User(id: doc.id, name: data['name'] ?? '');
  }
}

class Post {
  final String id;
  final String userId;
  final String content;
  User? author;

  Post({required this.id, required this.userId, required this.content, this.author});

  factory Post.fromFirestore(DocumentSnapshot doc) {
    final data = doc.data() as Map<String, dynamic>;
    return Post(
      id: doc.id,
      userId: data['userId'] ?? '',
      content: data['content'] ?? '',
    );
  }
}

class ComplexStreamService {
  final FirebaseFirestore _firestore = FirebaseFirestore.instance;

  // Combining multiple streams - Posts với User info
  Stream<List<Post>> watchPostsWithAuthors() {
    return _firestore.collection('posts').snapshots().switchMap((postsSnapshot) {
      final posts = postsSnapshot.docs.map((doc) => Post.fromFirestore(doc)).toList();

      if (posts.isEmpty) {
        return Stream.value(<Post>[]);
      }

      // Lấy unique user IDs
      final userIds = posts.map((p) => p.userId).toSet().toList();

      // Stream users
      final userStreams = userIds.map((userId) {
        return _firestore.collection('users').doc(userId).snapshots();
      }).toList();

      // Combine tất cả user streams
      return Rx.combineLatestList(userStreams).map((userSnapshots) {
        final userMap = <String, User>{};
        for (final snapshot in userSnapshots) {
          if (snapshot.exists) {
            final user = User.fromFirestore(snapshot);
            userMap[user.id] = user;
          }
        }

        // Gắn author vào mỗi post
        for (final post in posts) {
          post.author = userMap[post.userId];
        }

        return posts;
      });
    });
  }

  // Pagination với streams
  Stream<List<Post>> watchPostsPaginated({
    required int limit,
    DocumentSnapshot? startAfter,
  }) {
    Query query = _firestore
        .collection('posts')
        .orderBy('createdAt', descending: true)
        .limit(limit);

    if (startAfter != null) {
      query = query.startAfterDocument(startAfter);
    }

    return query.snapshots().map((snapshot) {
      return snapshot.docs.map((doc) => Post.fromFirestore(doc)).toList();
    });
  }

  // Search với debouncing
  Stream<List<Post>> searchPosts(Stream<String> queryStream) {
    return queryStream
        .debounceTime(const Duration(milliseconds: 300))
        .distinctUntilChanged()
        .switchMap((query) {
          if (query.isEmpty) {
            return Stream.value(<Post>[]);
          }

          // Firestore không có full-text search
          // Dùng startAt/endAt cho prefix search
          return _firestore
              .collection('posts')
              .orderBy('content')
              .startAt([query])
              .endAt(['$query\uf8ff'])
              .snapshots()
              .map((snapshot) {
                return snapshot.docs.map((doc) => Post.fromFirestore(doc)).toList();
              });
        });
  }

  // Offline/Online handling
  Stream<List<Post>> watchPostsWithConnectivity() {
    return _firestore
        .collection('posts')
        .snapshots(includeMetadataChanges: true)
        .map((snapshot) {
          final isFromCache = snapshot.metadata.isFromCache;
          print('Data source: ${isFromCache ? "cache" : "server"}');

          return snapshot.docs.map((doc) => Post.fromFirestore(doc)).toList();
        });
  }
}

Part 5: Performance & Best Practices

12. Optimization Techniques

// optimization.dart
import 'package:flutter/material.dart';
import 'dart:async';

// BAD: Unnecessary rebuilds
class BadExample extends StatefulWidget {
  const BadExample({super.key});

  
  State<BadExample> createState() => _BadExampleState();
}

class _BadExampleState extends State<BadExample> {
  final _controller = StreamController<int>.broadcast();
  int _count = 0;

  
  Widget build(BuildContext context) {
    return Column(
      children: [
        // BAD: Cả Column rebuild khi stream emit
        StreamBuilder<int>(
          stream: _controller.stream,
          builder: (context, snapshot) {
            return Column(
              children: [
                Text('Count: ${snapshot.data ?? 0}'),
                // Widget này không cần stream nhưng vẫn rebuild
                const ExpensiveWidget(),
              ],
            );
          },
        ),
      ],
    );
  }

  
  void dispose() {
    _controller.close();
    super.dispose();
  }
}

// GOOD: Minimize rebuild scope
class GoodExample extends StatefulWidget {
  const GoodExample({super.key});

  
  State<GoodExample> createState() => _GoodExampleState();
}

class _GoodExampleState extends State<GoodExample> {
  final _controller = StreamController<int>.broadcast();

  
  Widget build(BuildContext context) {
    return Column(
      children: [
        // GOOD: Chỉ Text rebuild
        StreamBuilder<int>(
          stream: _controller.stream,
          builder: (context, snapshot) {
            return Text('Count: ${snapshot.data ?? 0}');
          },
        ),
        // Không rebuild khi stream emit
        const ExpensiveWidget(),
      ],
    );
  }

  
  void dispose() {
    _controller.close();
    super.dispose();
  }
}

class ExpensiveWidget extends StatelessWidget {
  const ExpensiveWidget({super.key});

  
  Widget build(BuildContext context) {
    print('ExpensiveWidget rebuilt'); // Debug
    return Container(
      padding: const EdgeInsets.all(16),
      child: const Text('Expensive computation here'),
    );
  }
}

// Stream disposal - Tránh memory leaks
class ProperDisposal extends StatefulWidget {
  const ProperDisposal({super.key});

  
  State<ProperDisposal> createState() => _ProperDisposalState();
}

class _ProperDisposalState extends State<ProperDisposal> {
  late final StreamController<int> _controller;
  StreamSubscription<int>? _subscription;

  
  void initState() {
    super.initState();
    _controller = StreamController<int>.broadcast();

    // Lưu subscription để cancel sau
    _subscription = _controller.stream.listen((data) {
      print('Data: $data');
    });
  }

  
  void dispose() {
    // QUAN TRỌNG: Cancel subscription trước
    _subscription?.cancel();
    // Sau đó đóng controller
    _controller.close();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return const Placeholder();
  }
}

// Using const constructors
class ConstExample extends StatelessWidget {
  const ConstExample({super.key});

  
  Widget build(BuildContext context) {
    return Column(
      children: [
        // GOOD: const widget không rebuild
        const Text('Static text'),
        const Icon(Icons.star),
        const SizedBox(height: 16),

        // BAD: Tạo instance mới mỗi build
        Text('Dynamic: ${DateTime.now()}'),
      ],
    );
  }
}

// Keys for widget identity
class KeysExample extends StatelessWidget {
  final List<String> items;

  const KeysExample({super.key, required this.items});

  
  Widget build(BuildContext context) {
    return ListView(
      children: items.map((item) {
        // GOOD: Key giúp Flutter identify widget khi list thay đổi
        return ListTile(
          key: ValueKey(item),
          title: Text(item),
        );
      }).toList(),
    );
  }
}

13. Testing Reactive Code

// test/reactive_test.dart
import 'package:flutter/material.dart';
import 'package:flutter_test/flutter_test.dart';
import 'dart:async';

// Service to test
class CounterService {
  final _controller = StreamController<int>.broadcast();
  int _count = 0;

  Stream<int> get stream => _controller.stream;
  int get current => _count;

  void increment() {
    _count++;
    _controller.add(_count);
  }

  void dispose() {
    _controller.close();
  }
}

// Widget to test
class CounterWidget extends StatelessWidget {
  final Stream<int> stream;

  const CounterWidget({super.key, required this.stream});

  
  Widget build(BuildContext context) {
    return StreamBuilder<int>(
      stream: stream,
      initialData: 0,
      builder: (context, snapshot) {
        return Text('Count: ${snapshot.data}');
      },
    );
  }
}

void main() {
  group('CounterService', () {
    late CounterService service;

    setUp(() {
      service = CounterService();
    });

    tearDown(() {
      service.dispose();
    });

    test('initial value is 0', () {
      expect(service.current, 0);
    });

    test('increment increases count', () {
      service.increment();
      expect(service.current, 1);
    });

    test('stream emits new values', () async {
      // Expect stream to emit values
      expectLater(
        service.stream,
        emitsInOrder([1, 2, 3]),
      );

      service.increment();
      service.increment();
      service.increment();
    });

    test('stream emits error', () async {
      final controller = StreamController<int>();

      expectLater(
        controller.stream,
        emitsError(isA<Exception>()),
      );

      controller.addError(Exception('Test error'));
      await controller.close();
    });
  });

  group('CounterWidget', () {
    testWidgets('displays stream data', (tester) async {
      final controller = StreamController<int>.broadcast();

      await tester.pumpWidget(
        MaterialApp(
          home: CounterWidget(stream: controller.stream),
        ),
      );

      // Initial state
      expect(find.text('Count: 0'), findsOneWidget);

      // Emit new value
      controller.add(5);
      await tester.pump();

      expect(find.text('Count: 5'), findsOneWidget);

      await controller.close();
    });

    testWidgets('handles stream errors', (tester) async {
      final controller = StreamController<int>.broadcast();

      await tester.pumpWidget(
        MaterialApp(
          home: StreamBuilder<int>(
            stream: controller.stream,
            builder: (context, snapshot) {
              if (snapshot.hasError) {
                return Text('Error: ${snapshot.error}');
              }
              return Text('Count: ${snapshot.data ?? 0}');
            },
          ),
        ),
      );

      controller.addError('Test error');
      await tester.pump();

      expect(find.text('Error: Test error'), findsOneWidget);

      await controller.close();
    });
  });
}

// Mock stream for testing
class MockDataService {
  Stream<List<String>> watchItems() {
    // Return fake data for testing
    return Stream.value(['Item 1', 'Item 2', 'Item 3']);
  }

  Stream<List<String>> watchItemsWithDelay() {
    return Stream.periodic(
      const Duration(milliseconds: 100),
      (i) => List.generate(i + 1, (j) => 'Item $j'),
    ).take(3);
  }
}

14. Common Pitfalls & Solutions

// pitfalls.dart
import 'package:flutter/material.dart';
import 'dart:async';

// PITFALL 1: Stream subscription leak
class LeakExample extends StatefulWidget {
  const LeakExample({super.key});

  
  State<LeakExample> createState() => _LeakExampleState();
}

class _LeakExampleState extends State<LeakExample> {
  // BAD: Không lưu subscription
  
  void initState() {
    super.initState();
    // Memory leak! Subscription never cancelled
    Stream.periodic(const Duration(seconds: 1)).listen((data) {
      print('Data: $data');
    });
  }

  
  Widget build(BuildContext context) => const Placeholder();
}

// SOLUTION: Proper subscription management
class NoLeakExample extends StatefulWidget {
  const NoLeakExample({super.key});

  
  State<NoLeakExample> createState() => _NoLeakExampleState();
}

class _NoLeakExampleState extends State<NoLeakExample> {
  StreamSubscription? _subscription;

  
  void initState() {
    super.initState();
    // GOOD: Lưu subscription
    _subscription = Stream.periodic(const Duration(seconds: 1)).listen((data) {
      print('Data: $data');
    });
  }

  
  void dispose() {
    // GOOD: Cancel khi dispose
    _subscription?.cancel();
    super.dispose();
  }

  
  Widget build(BuildContext context) => const Placeholder();
}

// PITFALL 2: Multiple listeners on single-subscription stream
void multipleListenersPitfall() {
  // BAD: Stream chỉ cho phép một listener
  final stream = Stream.periodic(const Duration(seconds: 1), (i) => i).take(5);

  stream.listen((data) => print('Listener 1: $data'));
  // stream.listen((data) => print('Listener 2: $data')); // ERROR!
}

// SOLUTION: Use broadcast stream
void multipleListenersSolution() {
  // GOOD: Broadcast stream cho phép nhiều listeners
  final controller = StreamController<int>.broadcast();

  controller.stream.listen((data) => print('Listener 1: $data'));
  controller.stream.listen((data) => print('Listener 2: $data'));

  controller.add(1);
  controller.close();
}

// PITFALL 3: setState after dispose
class SetStateAfterDispose extends StatefulWidget {
  const SetStateAfterDispose({super.key});

  
  State<SetStateAfterDispose> createState() => _SetStateAfterDisposeState();
}

class _SetStateAfterDisposeState extends State<SetStateAfterDispose> {
  StreamSubscription? _subscription;

  
  void initState() {
    super.initState();
    _subscription = fetchData().listen((data) {
      // BAD: Có thể gọi setState sau khi widget disposed
      setState(() {});
    });
  }

  // SOLUTION: Check mounted before setState
  void safeSetState(VoidCallback fn) {
    if (mounted) {
      setState(fn);
    }
  }

  
  void dispose() {
    _subscription?.cancel();
    super.dispose();
  }

  
  Widget build(BuildContext context) => const Placeholder();
}

Stream<int> fetchData() async* {
  await Future.delayed(const Duration(seconds: 2));
  yield 1;
}

// PITFALL 4: Not handling all connection states
class IncompleteHandling extends StatelessWidget {
  final Stream<int> stream;

  const IncompleteHandling({super.key, required this.stream});

  
  Widget build(BuildContext context) {
    return StreamBuilder<int>(
      stream: stream,
      builder: (context, snapshot) {
        // BAD: Không handle waiting state
        return Text('Data: ${snapshot.data}');
      },
    );
  }
}

// SOLUTION: Handle all states
class CompleteHandling extends StatelessWidget {
  final Stream<int> stream;

  const CompleteHandling({super.key, required this.stream});

  
  Widget build(BuildContext context) {
    return StreamBuilder<int>(
      stream: stream,
      builder: (context, snapshot) {
        // GOOD: Handle tất cả states
        if (snapshot.connectionState == ConnectionState.waiting) {
          return const CircularProgressIndicator();
        }

        if (snapshot.hasError) {
          return Text('Error: ${snapshot.error}');
        }

        if (!snapshot.hasData) {
          return const Text('No data');
        }

        return Text('Data: ${snapshot.data}');
      },
    );
  }
}

// Debugging tips
void debuggingStreams() {
  final controller = StreamController<int>.broadcast();

  // Debug với doOnData, doOnError, doOnDone (RxDart)
  // Hoặc dùng handleError, map

  controller.stream
      .map((data) {
        print('DEBUG: Received $data'); // Log mỗi event
        return data;
      })
      .handleError((error) {
        print('DEBUG: Error $error');
      })
      .listen((data) {
        print('Final: $data');
      });

  controller.add(1);
  controller.close();
}

Part 6: Real-World Project - Reactive Todo App

15. Case Study: Building a Reactive Todo App

Architecture Overview:

┌─────────────────────────────────────────────────────────────┐
PRESENTATION LAYER│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │
│  │  TodoList   │  │  TodoItem   │  │ SearchBar   │          │
│  │   Widget    │  │   Widget    │  │   Widget    │          │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘          │
└─────────┼────────────────┼────────────────┼─────────────────┘
          │                │                │
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────┐
STATE LAYER (Riverpod)│  ┌─────────────────────────────────────────────────────┐    │
│  │              TodoNotifier (StateNotifier)            │    │
│  │  - todos: List<Todo>                                 │    │
│  │  - filter: TodoFilter                                │    │
│  │  - searchQuery: String                               │    │
│  └──────────────────────┬──────────────────────────────┘    │
└─────────────────────────┼───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
REPOSITORY LAYER│  ┌─────────────────────────────────────────────────────┐    │
│  │              TodoRepository                          │    │
│  │  - watchTodos(): Stream<List<Todo>>                  │    │
│  │  - addTodo(), updateTodo(), deleteTodo()            │    │
│  └──────────────────────┬──────────────────────────────┘    │
└─────────────────────────┼───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
DATA LAYER│  ┌─────────────────────────────────────────────────────┐    │
│  │              Firestore / Local Storage               │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Complete Implementation:

# pubspec.yaml
name: reactive_todo_app
description: A reactive todo app with Riverpod and Firestore

dependencies:
  flutter:
    sdk: flutter
  flutter_riverpod: ^2.4.9
  cloud_firestore: ^4.14.0
  firebase_core: ^2.24.2
  rxdart: ^0.27.7
  uuid: ^4.2.2
// lib/models/todo.dart
import 'package:cloud_firestore/cloud_firestore.dart';

enum TodoPriority { low, medium, high }

class Todo {
  final String id;
  final String title;
  final String? description;
  final bool completed;
  final TodoPriority priority;
  final DateTime createdAt;
  final DateTime? completedAt;

  Todo({
    required this.id,
    required this.title,
    this.description,
    required this.completed,
    required this.priority,
    required this.createdAt,
    this.completedAt,
  });

  // Từ Firestore document
  factory Todo.fromFirestore(DocumentSnapshot doc) {
    final data = doc.data() as Map<String, dynamic>;
    return Todo(
      id: doc.id,
      title: data['title'] ?? '',
      description: data['description'],
      completed: data['completed'] ?? false,
      priority: TodoPriority.values[data['priority'] ?? 1],
      createdAt: (data['createdAt'] as Timestamp?)?.toDate() ?? DateTime.now(),
      completedAt: (data['completedAt'] as Timestamp?)?.toDate(),
    );
  }

  // Chuyển sang Map cho Firestore
  Map<String, dynamic> toFirestore() {
    return {
      'title': title,
      'description': description,
      'completed': completed,
      'priority': priority.index,
      'createdAt': Timestamp.fromDate(createdAt),
      'completedAt': completedAt != null ? Timestamp.fromDate(completedAt!) : null,
    };
  }

  // Copy với thay đổi
  Todo copyWith({
    String? title,
    String? description,
    bool? completed,
    TodoPriority? priority,
    DateTime? completedAt,
  }) {
    return Todo(
      id: id,
      title: title ?? this.title,
      description: description ?? this.description,
      completed: completed ?? this.completed,
      priority: priority ?? this.priority,
      createdAt: createdAt,
      completedAt: completedAt ?? this.completedAt,
    );
  }
}
// lib/repositories/todo_repository.dart
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:rxdart/rxdart.dart';
import '../models/todo.dart';

class TodoRepository {
  final FirebaseFirestore _firestore;
  final String _userId;

  TodoRepository({
    required FirebaseFirestore firestore,
    required String userId,
  })  : _firestore = firestore,
        _userId = userId;

  // Collection reference cho user's todos
  CollectionReference get _todosRef =>
      _firestore.collection('users').doc(_userId).collection('todos');

  // Real-time stream of todos
  Stream<List<Todo>> watchTodos() {
    return _todosRef
        .orderBy('createdAt', descending: true)
        .snapshots()
        .map((snapshot) {
      return snapshot.docs.map((doc) => Todo.fromFirestore(doc)).toList();
    }).handleError((error) {
      print('Error watching todos: $error');
      return <Todo>[];
    });
  }

  // Stream với search và filter
  Stream<List<Todo>> watchFilteredTodos({
    String? searchQuery,
    bool? completed,
  }) {
    Query query = _todosRef.orderBy('createdAt', descending: true);

    if (completed != null) {
      query = query.where('completed', isEqualTo: completed);
    }

    return query.snapshots().map((snapshot) {
      var todos = snapshot.docs.map((doc) => Todo.fromFirestore(doc)).toList();

      // Client-side search (Firestore không support full-text search)
      if (searchQuery != null && searchQuery.isNotEmpty) {
        final lowerQuery = searchQuery.toLowerCase();
        todos = todos.where((todo) {
          return todo.title.toLowerCase().contains(lowerQuery) ||
              (todo.description?.toLowerCase().contains(lowerQuery) ?? false);
        }).toList();
      }

      return todos;
    });
  }

  // Add todo với optimistic update support
  Future<String> addTodo({
    required String title,
    String? description,
    TodoPriority priority = TodoPriority.medium,
  }) async {
    final docRef = await _todosRef.add({
      'title': title,
      'description': description,
      'completed': false,
      'priority': priority.index,
      'createdAt': FieldValue.serverTimestamp(),
      'completedAt': null,
    });
    return docRef.id;
  }

  // Update todo
  Future<void> updateTodo(String id, Map<String, dynamic> updates) {
    return _todosRef.doc(id).update(updates);
  }

  // Toggle completed status
  Future<void> toggleCompleted(String id, bool completed) {
    return _todosRef.doc(id).update({
      'completed': completed,
      'completedAt': completed ? FieldValue.serverTimestamp() : null,
    });
  }

  // Delete todo
  Future<void> deleteTodo(String id) {
    return _todosRef.doc(id).delete();
  }

  // Batch operations cho undo/redo
  Future<void> restoreTodo(Todo todo) {
    return _todosRef.doc(todo.id).set(todo.toFirestore());
  }
}
// lib/providers/todo_provider.dart
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:rxdart/rxdart.dart';
import '../models/todo.dart';
import '../repositories/todo_repository.dart';

// Filter enum
enum TodoFilter { all, active, completed }

// State cho todo list
class TodoState {
  final List<Todo> todos;
  final TodoFilter filter;
  final String searchQuery;
  final bool isLoading;
  final String? error;
  final List<Todo> deletedTodos; // Cho undo

  const TodoState({
    this.todos = const [],
    this.filter = TodoFilter.all,
    this.searchQuery = '',
    this.isLoading = false,
    this.error,
    this.deletedTodos = const [],
  });

  // Filtered todos
  List<Todo> get filteredTodos {
    var result = todos;

    // Apply filter
    switch (filter) {
      case TodoFilter.active:
        result = result.where((t) => !t.completed).toList();
        break;
      case TodoFilter.completed:
        result = result.where((t) => t.completed).toList();
        break;
      case TodoFilter.all:
        break;
    }

    // Apply search
    if (searchQuery.isNotEmpty) {
      final query = searchQuery.toLowerCase();
      result = result.where((t) {
        return t.title.toLowerCase().contains(query) ||
            (t.description?.toLowerCase().contains(query) ?? false);
      }).toList();
    }

    return result;
  }

  // Stats
  int get activeCount => todos.where((t) => !t.completed).length;
  int get completedCount => todos.where((t) => t.completed).length;

  TodoState copyWith({
    List<Todo>? todos,
    TodoFilter? filter,
    String? searchQuery,
    bool? isLoading,
    String? error,
    List<Todo>? deletedTodos,
  }) {
    return TodoState(
      todos: todos ?? this.todos,
      filter: filter ?? this.filter,
      searchQuery: searchQuery ?? this.searchQuery,
      isLoading: isLoading ?? this.isLoading,
      error: error,
      deletedTodos: deletedTodos ?? this.deletedTodos,
    );
  }
}

// StateNotifier cho todo management
class TodoNotifier extends StateNotifier<TodoState> {
  final TodoRepository _repository;

  TodoNotifier(this._repository) : super(const TodoState(isLoading: true)) {
    _init();
  }

  void _init() {
    // Subscribe to repository stream
    _repository.watchTodos().listen(
      (todos) {
        state = state.copyWith(todos: todos, isLoading: false);
      },
      onError: (error) {
        state = state.copyWith(error: error.toString(), isLoading: false);
      },
    );
  }

  // Set filter
  void setFilter(TodoFilter filter) {
    state = state.copyWith(filter: filter);
  }

  // Set search query với debounce
  void setSearchQuery(String query) {
    state = state.copyWith(searchQuery: query);
  }

  // Add todo
  Future<void> addTodo({
    required String title,
    String? description,
    TodoPriority priority = TodoPriority.medium,
  }) async {
    try {
      await _repository.addTodo(
        title: title,
        description: description,
        priority: priority,
      );
    } catch (e) {
      state = state.copyWith(error: e.toString());
    }
  }

  // Toggle completed
  Future<void> toggleCompleted(String id) async {
    final todo = state.todos.firstWhere((t) => t.id == id);
    try {
      await _repository.toggleCompleted(id, !todo.completed);
    } catch (e) {
      state = state.copyWith(error: e.toString());
    }
  }

  // Delete với undo support
  Future<void> deleteTodo(String id) async {
    final todo = state.todos.firstWhere((t) => t.id == id);

    // Lưu để undo
    state = state.copyWith(
      deletedTodos: [...state.deletedTodos, todo],
    );

    try {
      await _repository.deleteTodo(id);
    } catch (e) {
      // Rollback nếu lỗi
      state = state.copyWith(
        deletedTodos: state.deletedTodos.where((t) => t.id != id).toList(),
        error: e.toString(),
      );
    }
  }

  // Undo delete
  Future<void> undoDelete() async {
    if (state.deletedTodos.isEmpty) return;

    final todo = state.deletedTodos.last;
    try {
      await _repository.restoreTodo(todo);
      state = state.copyWith(
        deletedTodos: state.deletedTodos.sublist(0, state.deletedTodos.length - 1),
      );
    } catch (e) {
      state = state.copyWith(error: e.toString());
    }
  }

  // Clear error
  void clearError() {
    state = state.copyWith(error: null);
  }
}

// Providers
final todoRepositoryProvider = Provider<TodoRepository>((ref) {
  // Trong thực tế, userId sẽ từ auth provider
  return TodoRepository(
    firestore: FirebaseFirestore.instance,
    userId: 'demo_user',
  );
});

final todoProvider = StateNotifierProvider<TodoNotifier, TodoState>((ref) {
  final repository = ref.watch(todoRepositoryProvider);
  return TodoNotifier(repository);
});

// Derived providers
final filteredTodosProvider = Provider<List<Todo>>((ref) {
  return ref.watch(todoProvider).filteredTodos;
});

final todoStatsProvider = Provider<Map<String, int>>((ref) {
  final state = ref.watch(todoProvider);
  return {'active': state.activeCount, 'completed': state.completedCount};
});
// lib/screens/todo_screen.dart
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../models/todo.dart';
import '../providers/todo_provider.dart';

class TodoScreen extends ConsumerWidget {
  const TodoScreen({super.key});

  
  Widget build(BuildContext context, WidgetRef ref) {
    final state = ref.watch(todoProvider);

    // Show error snackbar
    ref.listen<TodoState>(todoProvider, (previous, next) {
      if (next.error != null && previous?.error != next.error) {
        ScaffoldMessenger.of(context).showSnackBar(
          SnackBar(
            content: Text(next.error!),
            action: SnackBarAction(
              label: 'Dismiss',
              onPressed: () {
                ref.read(todoProvider.notifier).clearError();
              },
            ),
          ),
        );
      }
    });

    return Scaffold(
      appBar: AppBar(
        title: const Text('Reactive Todo'),
        actions: [
          // Undo button
          if (state.deletedTodos.isNotEmpty)
            IconButton(
              icon: const Icon(Icons.undo),
              onPressed: () {
                ref.read(todoProvider.notifier).undoDelete();
              },
            ),
        ],
      ),
      body: Column(
        children: [
          // Search bar
          const _SearchBar(),

          // Filter tabs
          const _FilterTabs(),

          // Stats
          const _TodoStats(),

          // Todo list
          Expanded(
            child: state.isLoading
                ? const Center(child: CircularProgressIndicator())
                : const _TodoList(),
          ),
        ],
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: () => _showAddDialog(context, ref),
        child: const Icon(Icons.add),
      ),
    );
  }

  void _showAddDialog(BuildContext context, WidgetRef ref) {
    final titleController = TextEditingController();
    final descController = TextEditingController();

    showDialog(
      context: context,
      builder: (context) => AlertDialog(
        title: const Text('Add Todo'),
        content: Column(
          mainAxisSize: MainAxisSize.min,
          children: [
            TextField(
              controller: titleController,
              decoration: const InputDecoration(labelText: 'Title'),
              autofocus: true,
            ),
            TextField(
              controller: descController,
              decoration: const InputDecoration(labelText: 'Description'),
            ),
          ],
        ),
        actions: [
          TextButton(
            onPressed: () => Navigator.pop(context),
            child: const Text('Cancel'),
          ),
          ElevatedButton(
            onPressed: () {
              if (titleController.text.isNotEmpty) {
                ref.read(todoProvider.notifier).addTodo(
                      title: titleController.text,
                      description: descController.text.isNotEmpty
                          ? descController.text
                          : null,
                    );
                Navigator.pop(context);
              }
            },
            child: const Text('Add'),
          ),
        ],
      ),
    );
  }
}

class _SearchBar extends ConsumerWidget {
  const _SearchBar();

  
  Widget build(BuildContext context, WidgetRef ref) {
    return Padding(
      padding: const EdgeInsets.all(16),
      child: TextField(
        decoration: InputDecoration(
          hintText: 'Search todos...',
          prefixIcon: const Icon(Icons.search),
          border: OutlineInputBorder(
            borderRadius: BorderRadius.circular(8),
          ),
        ),
        onChanged: (value) {
          ref.read(todoProvider.notifier).setSearchQuery(value);
        },
      ),
    );
  }
}

class _FilterTabs extends ConsumerWidget {
  const _FilterTabs();

  
  Widget build(BuildContext context, WidgetRef ref) {
    final currentFilter = ref.watch(todoProvider).filter;

    return Row(
      mainAxisAlignment: MainAxisAlignment.center,
      children: TodoFilter.values.map((filter) {
        final isSelected = filter == currentFilter;
        return Padding(
          padding: const EdgeInsets.symmetric(horizontal: 4),
          child: FilterChip(
            selected: isSelected,
            label: Text(filter.name.toUpperCase()),
            onSelected: (_) {
              ref.read(todoProvider.notifier).setFilter(filter);
            },
          ),
        );
      }).toList(),
    );
  }
}

class _TodoStats extends ConsumerWidget {
  const _TodoStats();

  
  Widget build(BuildContext context, WidgetRef ref) {
    final stats = ref.watch(todoStatsProvider);

    return Padding(
      padding: const EdgeInsets.all(8),
      child: Text(
        '${stats['active']} active • ${stats['completed']} completed',
        style: Theme.of(context).textTheme.bodySmall,
      ),
    );
  }
}

class _TodoList extends ConsumerWidget {
  const _TodoList();

  
  Widget build(BuildContext context, WidgetRef ref) {
    final todos = ref.watch(filteredTodosProvider);

    if (todos.isEmpty) {
      return const Center(
        child: Text('No todos found'),
      );
    }

    return ListView.builder(
      itemCount: todos.length,
      itemBuilder: (context, index) {
        return _TodoItem(todo: todos[index]);
      },
    );
  }
}

class _TodoItem extends ConsumerWidget {
  final Todo todo;

  const _TodoItem({required this.todo});

  
  Widget build(BuildContext context, WidgetRef ref) {
    return Dismissible(
      key: ValueKey(todo.id),
      direction: DismissDirection.endToStart,
      background: Container(
        alignment: Alignment.centerRight,
        padding: const EdgeInsets.only(right: 16),
        color: Colors.red,
        child: const Icon(Icons.delete, color: Colors.white),
      ),
      onDismissed: (_) {
        ref.read(todoProvider.notifier).deleteTodo(todo.id);

        // Show undo snackbar
        ScaffoldMessenger.of(context).showSnackBar(
          SnackBar(
            content: Text('Deleted "${todo.title}"'),
            action: SnackBarAction(
              label: 'Undo',
              onPressed: () {
                ref.read(todoProvider.notifier).undoDelete();
              },
            ),
          ),
        );
      },
      child: ListTile(
        leading: Checkbox(
          value: todo.completed,
          onChanged: (_) {
            ref.read(todoProvider.notifier).toggleCompleted(todo.id);
          },
        ),
        title: Text(
          todo.title,
          style: TextStyle(
            decoration: todo.completed ? TextDecoration.lineThrough : null,
          ),
        ),
        subtitle: todo.description != null ? Text(todo.description!) : null,
        trailing: _PriorityBadge(priority: todo.priority),
      ),
    );
  }
}

class _PriorityBadge extends StatelessWidget {
  final TodoPriority priority;

  const _PriorityBadge({required this.priority});

  
  Widget build(BuildContext context) {
    Color color;
    switch (priority) {
      case TodoPriority.high:
        color = Colors.red;
        break;
      case TodoPriority.medium:
        color = Colors.orange;
        break;
      case TodoPriority.low:
        color = Colors.green;
        break;
    }

    return Container(
      padding: const EdgeInsets.symmetric(horizontal: 8, vertical: 4),
      decoration: BoxDecoration(
        color: color.withOpacity(0.2),
        borderRadius: BorderRadius.circular(4),
      ),
      child: Text(
        priority.name.toUpperCase(),
        style: TextStyle(color: color, fontSize: 10),
      ),
    );
  }
}
// lib/main.dart
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:firebase_core/firebase_core.dart';
import 'screens/todo_screen.dart';

void main() async {
  WidgetsFlutterBinding.ensureInitialized();
  await Firebase.initializeApp();

  runApp(
    const ProviderScope(
      child: TodoApp(),
    ),
  );
}

class TodoApp extends StatelessWidget {
  const TodoApp({super.key});

  
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Reactive Todo',
      theme: ThemeData(
        colorScheme: ColorScheme.fromSeed(seedColor: Colors.blue),
        useMaterial3: true,
      ),
      home: const TodoScreen(),
    );
  }
}

Conclusion

Summary of Patterns

┌─────────────────────────────────────────────────────────────┐
REACTIVE PATTERNS SUMMARY├─────────────────────────────────────────────────────────────┤
PatternUse When├─────────────────────────────────────────────────────────────┤
Stream/StreamBuilderReal-time data, async sequences      │
ValueNotifierSimple single values, local state    │
ProviderShared state, dependency injection   │
RiverpodModern apps, testability, scoping    │
RxDartComplex transformations, operators   │
Firestore StreamsReal-time database sync              │
└─────────────────────────────────────────────────────────────┘

When to Use Which Approach

  • Streams: Real-time data, multiple async events
  • ValueNotifier: Simple local state, toggle buttons, form fields
  • Provider: Shared state between widgets, medium complexity apps
  • Riverpod: Large apps, testability needed, complex dependencies
  • RxDart: Complex data transformations, multiple stream combination

Migration Path

  1. Start with setState for simple cases
  2. Move to ValueNotifier for local reactive state
  3. Use Provider for shared state
  4. Adopt Riverpod for testability and scalability
  5. Add RxDart for complex stream operations

Resources