RxJS waiting for response before sending next command over UDP

I am currently working on a project where I send UDP commands to a Tello drone. The problem is that it uses UDP and when I send commands too fast before the previous one hasn't finished yet, the second command/action doesn't take place. I am using RxJS for this project and I want to create a mechanism to wait for the response ("ok" or "error") from the drone. My Idea is to have 2 different observables. 1 observable that is the input stream from the responses from the drone and one observable of observables that I use as a commandQueue. this commandQueue has simple observables on it with 1 command I want to send. And I only want to send the next command when I received the "ok" message from the other observable. When I get the "ok" i would complete the simple command observable and it would automatically receive the next value on the commandQueue, being the next command. my code works only when I send an array of commands, but I want to call the function multiple times, so sending them 1 by 1. The following code is the function in question, testsubject is an observable to send the next command to the drone.

async send_command_with_return(msg) {
    let parentobject = this;

    let zeroTime = timestamp();
    const now = () => numeral((timestamp() - zeroTime) / 10e3).format("0.0000");

    const asyncTask = data =>
      new Observable(obs => {
        console.log(`${now()}: starting async task ${data}`);

        
        parentobject.Client.pipe(take(1)).subscribe(
          dataa => {
            console.log("loool")
            obs.next(data);
            this.testSubject.next(data);
            console.log(`${now()}: end of async task ${data}`);
            obs.complete();
          },
          err => console.error("Observer got an error: " + err),
          () => console.log("observer asynctask finished with " + data + "\n")
        );
      });

    let p = this.commandQueue.pipe(concatMap(asyncTask)).toPromise(P); //commandQueue is a subject in the constructor

    console.log("start filling queue with " + msg);
    zeroTime = timestamp();
    this.commandQueue.next(msg);
    //["streamon", "streamoff", "height?", "temp?"].forEach(a => this.commandQueue.next(a));

    await p;

    // this.testSubject.next(msg);
  }

  streamon() {
    this.send_command_with_return("streamon");
  }

  streamoff() {
    this.send_command_with_return("streamoff");
  }

  get_speed() {
    this.send_command_with_return("speed?");
  }

  get_battery() {
    this.send_command_with_return("battery?");
  }
}

let tello = new Tello();
tello.init();

 tello.streamon();
 tello.streamoff();


from Recent Questions - Stack Overflow https://ift.tt/3snXe9M
https://ift.tt/eA8V8J

Comments

Popular posts from this blog

Spring Elasticsearch Operations

Network Error and Timeout on Authorize.net JS

Object oriented programming concepts (OOPs)