RxJS waiting for response before sending next command over UDP
I am currently working on a project where I send UDP commands to a Tello drone. The problem is that it uses UDP and when I send commands too fast before the previous one hasn't finished yet, the second command/action doesn't take place. I am using RxJS for this project and I want to create a mechanism to wait for the response ("ok" or "error") from the drone. My Idea is to have 2 different observables. 1 observable that is the input stream from the responses from the drone and one observable of observables that I use as a commandQueue. this commandQueue has simple observables on it with 1 command I want to send. And I only want to send the next command when I received the "ok" message from the other observable. When I get the "ok" i would complete the simple command observable and it would automatically receive the next value on the commandQueue, being the next command. my code works only when I send an array of commands, but I want to call the function multiple times, so sending them 1 by 1. The following code is the function in question, testsubject is an observable to send the next command to the drone.
async send_command_with_return(msg) {
let parentobject = this;
let zeroTime = timestamp();
const now = () => numeral((timestamp() - zeroTime) / 10e3).format("0.0000");
const asyncTask = data =>
new Observable(obs => {
console.log(`${now()}: starting async task ${data}`);
parentobject.Client.pipe(take(1)).subscribe(
dataa => {
console.log("loool")
obs.next(data);
this.testSubject.next(data);
console.log(`${now()}: end of async task ${data}`);
obs.complete();
},
err => console.error("Observer got an error: " + err),
() => console.log("observer asynctask finished with " + data + "\n")
);
});
let p = this.commandQueue.pipe(concatMap(asyncTask)).toPromise(P); //commandQueue is a subject in the constructor
console.log("start filling queue with " + msg);
zeroTime = timestamp();
this.commandQueue.next(msg);
//["streamon", "streamoff", "height?", "temp?"].forEach(a => this.commandQueue.next(a));
await p;
// this.testSubject.next(msg);
}
streamon() {
this.send_command_with_return("streamon");
}
streamoff() {
this.send_command_with_return("streamoff");
}
get_speed() {
this.send_command_with_return("speed?");
}
get_battery() {
this.send_command_with_return("battery?");
}
}
let tello = new Tello();
tello.init();
tello.streamon();
tello.streamoff();
from Recent Questions - Stack Overflow https://ift.tt/3snXe9M
https://ift.tt/eA8V8J
Comments
Post a Comment