How to break up Dart Isolate code to avoid blocking the event queue?

I am writing a test program to explore the use of Isolates in Dart/Flutter. One type of isolate that I have is started and stopped using a switch on a Flutter UI. This sends a message (START and STOP) to the isolate and I use a ValueNotifier to detect these commands and respond to them. When I initially wrote this, the Isolate ran continuously and didn’t respond the to the STOP command, which I understand is because the event queue would never be empty to process it.

Based on the first answer to this thread... How to terminate a long running isolate #2

… and the suggested approach on this blog page: https://hackernoon.com/executing-heavy-tasks-without-blocking-the-main-thread-on-flutter-6mx31lh

… I have tried to break my code up using Futures. I have split my run block into runWorker() and runChunk() functions, with runChunk called repeatedly as long as the Isolate should be running (run == true). I am doing something wrong because the Isolate still runs away and does not process the STOP command. I get slightly different results depending on whether I call runChunk directly or using Future.delayed (as per the hackernoon blog page), but neither approach works.

I believe that the STOP code works because if I remove the processing loop then everything triggers as expected, and I had an earlier version of this that worked when I included a 'Future.delayed' of 1 microsecond between each counter loop. So assume I am just using Futures incorrectly and not freeing up the event queue between 'runChunk' calls.

Can anyone tell me what I am doing wrong here? Here is the code for my isolate...

import 'dart:async';
import 'dart:isolate';

import 'package:flutter/material.dart';

class ContinuousIsolator {
  ContinuousIsolator(
      {required int channel, required void Function(double) setCounter}) {
    print('Isolator initialisation');
    _channel = channel;
    _setCounter = setCounter;
    spawn();
  }
  late int _channel; // The id of this isolate
  late void Function(double) _setCounter;
  late SendPort _port;
  late Isolate _isolate;
  ReceivePort receivePort = ReceivePort();

  // Spawn a new isolate to complete the countdown (or up)
  // channel = the number of the isolate
  // counter = the value to count down from (or up to)
  void spawn() {
    print('Isolator establishing receiver');
    receivePort.listen((msg) {
      // print('Isolator message received');

      // Unpack the map from the returned string
      Map<int, dynamic> map = Map<int, dynamic>.from(msg);

      // There should be only one key:value pair
      for (var key in map.keys) {
        msg = map[key]; // Extract the message
      }
      // print('Channel $_channel received "$msg" of type ${msg.runtimeType}');

      // If we have received a Sendport, then add it to the port map
      if (msg is SendPort) {
        _port = msg;
      } else {
        // Otherwise process the message
        // If it contains 'END' then we need to terminate the isolate
        switch (msg) {
          case 'END':
            _isolate.kill();
            // Isolate has completed, then close this receiver port
            receivePort.close();
            break;
          default:
            _setCounter(msg); // Send message to display
            break;
        }
      }
    });

    // Start the isolate then let's get working on the countdown timer
    Isolate.spawn(worker, {_channel: receivePort.sendPort}).then((isolate) {
      _isolate = isolate; // Capture isolate so we can kill it later
    });
  }

  void run() {
    print('Sending START to worker');
    _port.send('START');
  }

  void stop() {
    print('Sending STOP to worker');
    _port.send('STOP');
  }

  void end() {
    _port.send('END'); // Send counter value to start countdown
  }
}

void worker(Map<int, dynamic> args) {
  int? id; // Number for this channel
  ReceivePort receivePort = ReceivePort(); // Receive port for
  SendPort? sendPort;
  ValueNotifier<String> message = ValueNotifier('');
  const double start = 10000000;
  double counter = start;
  const int chunkSize = 1000;
  bool down = true;
  bool run = true;

  // Unpack the args to get the id and sendPort.
  // There should be only one key:value pair
  dynamic msg = '';
  Map<int, dynamic> map = Map<int, dynamic>.from(args);
  for (var key in map.keys) {
    id = key; // Extract the isolate id
    msg = map[key]; // Extract the message
  }
  // First message should contain the receivePort for the main isolate
  if (msg is SendPort) {
    sendPort = msg;
    // print('args: $args    port: $sendPort');
    print('worker $id sending send port');
    sendPort.send({id: receivePort.sendPort});
  }

  double getCounter() {
    return counter;
  }

  void setCounter(double value) {
    counter = value;
  }

  bool getDown() {
    return down;
  }

  void setDown(bool value) {
    down = value;
  }

  Future runChunk(
      int chunkSize,
      bool Function() getDown,
      void Function(bool) setDown,
      double Function() getCounter,
      void Function(double) setCounter) {
    const double start = 10000000;

    print('Running chunk, counter is ${getCounter()}');
    for (int i = 0; i < chunkSize; i++) {
      // print('Worker $id in the while loop');
      if (getDown() == true) {
        setCounter(getCounter() - 1);
        if (getCounter() < 0) setDown(!getDown());
      } else {
        setCounter(getCounter() + 1);
        if (getCounter() > start) setDown(!getDown());
      }
      if ((getCounter() ~/ 1000) == getCounter() / 1000) {
        // print('Continuous Counter is ${getCounter()}');
        sendPort!.send({id: getCounter()});
      } // Send the receive port
    }
    return Future.value();
  }

  void changeMessage() {
    print('Message has changed to ${message.value}');
    if (message.value == 'START') {
      run = true;
    } else {
      run = false;
    }
  }

  void runWorker() async {
    message.addListener(changeMessage);

    print('Worker running counter down from $counter');
    while (run == true) {
      // This line appears to run the isolate, but there is no output/feedback to the GUI
      // The STOP command does not interrupt operation.
      Future.delayed(const Duration(microseconds: 0),
          () => runChunk(chunkSize, getDown, setDown, getCounter, setCounter));
      // This line runs the isolate with feedback to the GUI
      // The STOP command does not interrupt operation.
      runChunk(chunkSize, getDown, setDown, getCounter, setCounter);
    }

    message.removeListener(changeMessage);
  }

  // Establish listener for messages from the controller
  print('worker $id establishing listener');
  receivePort.listen((msg) {
    print('worker $id has received $msg');
    switch (msg) {
      case 'START':
        print('Worker $id starting run');
        message.value = msg;
        runWorker();
        break;
      case 'STOP':
        print('Worker $id stopping run');
        message.value = msg;
        break;
      case 'END':
        message.removeListener(changeMessage);
        receivePort.close;
        break;
      default:
        break;
    }
  });
}


Comments

Popular posts from this blog

Today Walkin 14th-Sept

Hibernate Search - Elasticsearch with JSON manipulation

Spring Elasticsearch Operations