How to break up Dart Isolate code to avoid blocking the event queue?
I am writing a test program to explore the use of Isolates in Dart/Flutter. One type of isolate that I have is started and stopped using a switch on a Flutter UI. This sends a message (START and STOP) to the isolate and I use a ValueNotifier
to detect these commands and respond to them. When I initially wrote this, the Isolate ran continuously and didn’t respond the to the STOP command, which I understand is because the event queue would never be empty to process it.
Based on the first answer to this thread... How to terminate a long running isolate #2
… and the suggested approach on this blog page: https://hackernoon.com/executing-heavy-tasks-without-blocking-the-main-thread-on-flutter-6mx31lh
… I have tried to break my code up using Futures. I have split my run block into runWorker()
and runChunk()
functions, with runChunk
called repeatedly as long as the Isolate should be running (run == true
). I am doing something wrong because the Isolate still runs away and does not process the STOP command. I get slightly different results depending on whether I call runChunk
directly or using Future.delayed
(as per the hackernoon blog page), but neither approach works.
I believe that the STOP code works because if I remove the processing loop then everything triggers as expected, and I had an earlier version of this that worked when I included a 'Future.delayed' of 1 microsecond between each counter loop. So assume I am just using Futures incorrectly and not freeing up the event queue between 'runChunk' calls.
Can anyone tell me what I am doing wrong here? Here is the code for my isolate...
import 'dart:async';
import 'dart:isolate';
import 'package:flutter/material.dart';
class ContinuousIsolator {
ContinuousIsolator(
{required int channel, required void Function(double) setCounter}) {
print('Isolator initialisation');
_channel = channel;
_setCounter = setCounter;
spawn();
}
late int _channel; // The id of this isolate
late void Function(double) _setCounter;
late SendPort _port;
late Isolate _isolate;
ReceivePort receivePort = ReceivePort();
// Spawn a new isolate to complete the countdown (or up)
// channel = the number of the isolate
// counter = the value to count down from (or up to)
void spawn() {
print('Isolator establishing receiver');
receivePort.listen((msg) {
// print('Isolator message received');
// Unpack the map from the returned string
Map<int, dynamic> map = Map<int, dynamic>.from(msg);
// There should be only one key:value pair
for (var key in map.keys) {
msg = map[key]; // Extract the message
}
// print('Channel $_channel received "$msg" of type ${msg.runtimeType}');
// If we have received a Sendport, then add it to the port map
if (msg is SendPort) {
_port = msg;
} else {
// Otherwise process the message
// If it contains 'END' then we need to terminate the isolate
switch (msg) {
case 'END':
_isolate.kill();
// Isolate has completed, then close this receiver port
receivePort.close();
break;
default:
_setCounter(msg); // Send message to display
break;
}
}
});
// Start the isolate then let's get working on the countdown timer
Isolate.spawn(worker, {_channel: receivePort.sendPort}).then((isolate) {
_isolate = isolate; // Capture isolate so we can kill it later
});
}
void run() {
print('Sending START to worker');
_port.send('START');
}
void stop() {
print('Sending STOP to worker');
_port.send('STOP');
}
void end() {
_port.send('END'); // Send counter value to start countdown
}
}
void worker(Map<int, dynamic> args) {
int? id; // Number for this channel
ReceivePort receivePort = ReceivePort(); // Receive port for
SendPort? sendPort;
ValueNotifier<String> message = ValueNotifier('');
const double start = 10000000;
double counter = start;
const int chunkSize = 1000;
bool down = true;
bool run = true;
// Unpack the args to get the id and sendPort.
// There should be only one key:value pair
dynamic msg = '';
Map<int, dynamic> map = Map<int, dynamic>.from(args);
for (var key in map.keys) {
id = key; // Extract the isolate id
msg = map[key]; // Extract the message
}
// First message should contain the receivePort for the main isolate
if (msg is SendPort) {
sendPort = msg;
// print('args: $args port: $sendPort');
print('worker $id sending send port');
sendPort.send({id: receivePort.sendPort});
}
double getCounter() {
return counter;
}
void setCounter(double value) {
counter = value;
}
bool getDown() {
return down;
}
void setDown(bool value) {
down = value;
}
Future runChunk(
int chunkSize,
bool Function() getDown,
void Function(bool) setDown,
double Function() getCounter,
void Function(double) setCounter) {
const double start = 10000000;
print('Running chunk, counter is ${getCounter()}');
for (int i = 0; i < chunkSize; i++) {
// print('Worker $id in the while loop');
if (getDown() == true) {
setCounter(getCounter() - 1);
if (getCounter() < 0) setDown(!getDown());
} else {
setCounter(getCounter() + 1);
if (getCounter() > start) setDown(!getDown());
}
if ((getCounter() ~/ 1000) == getCounter() / 1000) {
// print('Continuous Counter is ${getCounter()}');
sendPort!.send({id: getCounter()});
} // Send the receive port
}
return Future.value();
}
void changeMessage() {
print('Message has changed to ${message.value}');
if (message.value == 'START') {
run = true;
} else {
run = false;
}
}
void runWorker() async {
message.addListener(changeMessage);
print('Worker running counter down from $counter');
while (run == true) {
// This line appears to run the isolate, but there is no output/feedback to the GUI
// The STOP command does not interrupt operation.
Future.delayed(const Duration(microseconds: 0),
() => runChunk(chunkSize, getDown, setDown, getCounter, setCounter));
// This line runs the isolate with feedback to the GUI
// The STOP command does not interrupt operation.
runChunk(chunkSize, getDown, setDown, getCounter, setCounter);
}
message.removeListener(changeMessage);
}
// Establish listener for messages from the controller
print('worker $id establishing listener');
receivePort.listen((msg) {
print('worker $id has received $msg');
switch (msg) {
case 'START':
print('Worker $id starting run');
message.value = msg;
runWorker();
break;
case 'STOP':
print('Worker $id stopping run');
message.value = msg;
break;
case 'END':
message.removeListener(changeMessage);
receivePort.close;
break;
default:
break;
}
});
}
Comments
Post a Comment