python async with timer and endless loop

I have found two parts of code here for my planned project. The first with two timers ...

class Timer:
    def __init__(self, interval, first_immediately, timer_name, context, callback):
        self._interval = interval
        self._first_immediately = first_immediately
        self._name = timer_name
        self._context = context
        self._callback = callback
        self._is_first_call = True
        self._ok = True
        self._task = asyncio.ensure_future(self._job())
        print(timer_name + " init done")

    async def _job(self):
        try:
            while self._ok:
                if not self._is_first_call or not self._first_immediately:
                    await asyncio.sleep(self._interval)
                await self._callback(self._name, self._context, self)
                self._is_first_call = False
        except Exception as ex:
            print(ex)

    def cancel(self):
        self._ok = False
        self._task.cancel()

async def timer1(timer_name, context, timer):
    context['count'] += 1
    print('callback: ' + timer_name + ", count: " + str(context['count']))


async def timer2(timer_name, context, timer):
    context['count'] += 1
    print('callback: ' + timer_name + ", count: " + str(context['count']))

t1 = Timer(interval=1, first_immediately=True, timer_name="Timer Channel", context={'count': 0}, callback=timer1)
t2 = Timer(interval=3, first_immediately=False, timer_name="Timer Scroll", context={'count': 0}, callback=timer2)

try:
    loop = asyncio.get_event_loop()
    loop.run_forever()
except KeyboardInterrupt:
    t1.cancel()
    t2.cancel()
    print("clean up done")

and the second with an endless loop for checking events ...

while True:
for key, mask in selector.select():
    device = key.fileobj

    for event in device.read():
        ecode = event.code
        if event.type == evdev.ecodes.EV_KEY:
            match event.code:
                case 114:
                    print("Vol down")
                case 115:
                    print("Vol up")

Everything works as it should, but I don't know how to set it up so that everything runs independently without interfering with the other processes.

Best regards



Comments

Popular posts from this blog

Spring Elasticsearch Operations

Today Walkin 14th-Sept

Object oriented programming concepts (OOPs)