- Published on
Mastering RxDart: Solving Complex Streams in Flutter
- Authors

- Name
- Phat Tran
When building a standard Flutter application, native Dart Stream and StreamBuilder are usually enough. However, when you start building highly dynamic, real-time applications—like a Banking App where data changes continuously—native Streams can quickly become complicated and full of boilerplate.
This is where RxDart comes to the rescue.
What is RxDart?
RxDart is an implementation of the ReactiveX API for Dart. Simply put, it extends native Dart Streams by adding powerful operators (functions that allow you to filter, merge, delay, or transform data streams with minimal code).
In RxDart, standard Dart StreamControllers are replaced by Subjects (BehaviorSubject, PublishSubject, ReplaySubject), which provide more control over how stream events are cached and delivered to listeners.
When should you use it?
You should reach for RxDart when you face these specific problems:
- Time-based filtering: You need to delay, throttle, or debounce user inputs.
- Stream combining: You need to wait for multiple independent streams to emit data before updating the UI.
- Advanced transformations: You need to cancel previous asynchronous requests when a new one arrives.
Let's look at three practical scenarios in a Digital Banking App where RxDart outshines native solutions.
Use Case 1: Beneficiary Search (Debounce & Cancel)
The Problem: In your banking app, users can search for a recipient by name or phone number. If you call the API on every single keystroke, you will spam your backend, burn through API rate limits, and potentially display out-of-order results if an older request resolves after a newer one.
The RxDart Solution: Use debounceTime to wait until the user stops typing, and switchMap to automatically cancel any pending API requests if the user types something new.
import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';
class BeneficiarySearch extends StatefulWidget {
_BeneficiarySearchState createState() => _BeneficiarySearchState();
}
class _BeneficiarySearchState extends State<BeneficiarySearch> {
// BehaviorSubject remembers the latest value
final _searchSubject = BehaviorSubject<String>();
late Stream<List<String>> _searchResults;
void initState() {
super.initState();
_searchResults = _searchSubject.stream
// Wait for 500ms of typing silence
.debounceTime(const Duration(milliseconds: 500))
// Ignore if the text hasn't actually changed
.distinctUntilChanged()
// Cancel previous API call if a new one starts
.switchMap((query) {
if (query.isEmpty) return Stream.value([]);
return Stream.fromFuture(_fetchBeneficiariesFromApi(query));
});
}
Future<List<String>> _fetchBeneficiariesFromApi(String query) async {
print('Fetching API for: $query');
await Future.delayed(const Duration(seconds: 1)); // Simulate network
return ['Alex ($query)', 'Anna ($query)'];
}
void dispose() {
_searchSubject.close();
super.dispose();
}
Widget build(BuildContext context) {
return Column(
children: [
TextField(
onChanged: _searchSubject.add, // Push new keystrokes to the stream
decoration: const InputDecoration(labelText: 'Search phone or name...'),
),
Expanded(
child: StreamBuilder<List<String>>(
stream: _searchResults,
builder: (context, snapshot) {
if (!snapshot.hasData) return const Text('Type to search...');
return ListView(
children: snapshot.data!.map((s) => ListTile(title: Text(s))).toList(),
);
},
),
),
],
);
}
}
Use Case 2: Total Account Balance (Merging Streams)
The Problem: A user's total balance is the sum of their Checking Account, Savings Account, and Credit Card balance. Each of these values comes from a different API or WebSocket and updates independently in real-time. How do you calculate the total sum every time any of them changes?
The RxDart Solution: Use Rx.combineLatest to listen to all three streams simultaneously. Whenever one stream updates, it grabs the latest values from all streams and recalculates the total.
import 'package:rxdart/rxdart.dart';
class BalanceBloc {
// Mock streams representing real-time database connections
final Stream<double> checkingStream = Stream.periodic(const Duration(seconds: 5), (_) => 2500.0);
final Stream<double> savingsStream = Stream.periodic(const Duration(seconds: 2), (i) => 10000.0 + (i * 10));
final Stream<double> creditStream = Stream.periodic(const Duration(seconds: 10), (_) => -500.0);
// The merged stream
Stream<double> get totalBalanceStream {
return Rx.combineLatest3(
checkingStream,
savingsStream,
creditStream,
(double checking, double savings, double credit) {
// This recalculates every time ANY of the 3 streams emit a new value
return checking + savings + credit;
},
);
}
}
Use Case 3: Preventing Duplicate Money Transfers (Throttling)
The Problem: A user is on the transfer confirmation screen. They frantically tap the "Transfer Money" button 5 times because the network is slow. You don't want to execute 5 transfers and accidentally drain their account.
The RxDart Solution: Use exhaustMap or throttleTime. While throttleTime simply ignores taps for a set duration, exhaustMap is even better for API calls: it completely ignores all new taps until the current API request finishes processing.
import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart';
class TransferButton extends StatefulWidget {
_TransferButtonState createState() => _TransferButtonState();
}
class _TransferButtonState extends State<TransferButton> {
final _transferActionSubject = PublishSubject<void>();
void initState() {
super.initState();
_transferActionSubject.stream
// exhaustMap ignores all incoming events until the Future completes
.exhaustMap((_) => Stream.fromFuture(_executeTransfer()))
.listen((result) {
print('Transfer successful: $result');
});
}
Future<String> _executeTransfer() async {
print('Processing transfer... (ignoring other taps)');
await Future.delayed(const Duration(seconds: 2)); // Simulate API processing
return 'Transferred \$100';
}
void dispose() {
_transferActionSubject.close();
super.dispose();
}
Widget build(BuildContext context) {
return ElevatedButton(
// Push event to stream instead of directly calling the API
onPressed: () => _transferActionSubject.add(null),
child: const Text('Transfer Money'),
);
}
}
Conclusion
RxDart is an incredibly powerful tool, but it shouldn't be used for everything. For simple UI state toggles, stick to ValueNotifier. For global state, use Riverpod.
However, when you are building complex systems—like multi-account dashboards, real-time transaction feeds, or advanced search bars—RxDart's operators (debounceTime, switchMap, combineLatest, and exhaustMap) will save you hundreds of lines of messy, bug-prone code.