python async with timer and endless loop

I have found two parts of code here for my planned project. The first with two timers ...

class Timer:
    def __init__(self, interval, first_immediately, timer_name, context, callback):
        self._interval = interval
        self._first_immediately = first_immediately
        self._name = timer_name
        self._context = context
        self._callback = callback
        self._is_first_call = True
        self._ok = True
        self._task = asyncio.ensure_future(self._job())
        print(timer_name + " init done")

    async def _job(self):
        try:
            while self._ok:
                if not self._is_first_call or not self._first_immediately:
                    await asyncio.sleep(self._interval)
                await self._callback(self._name, self._context, self)
                self._is_first_call = False
        except Exception as ex:
            print(ex)

    def cancel(self):
        self._ok = False
        self._task.cancel()

async def timer1(timer_name, context, timer):
    context['count'] += 1
    print('callback: ' + timer_name + ", count: " + str(context['count']))


async def timer2(timer_name, context, timer):
    context['count'] += 1
    print('callback: ' + timer_name + ", count: " + str(context['count']))

t1 = Timer(interval=1, first_immediately=True, timer_name="Timer Channel", context={'count': 0}, callback=timer1)
t2 = Timer(interval=3, first_immediately=False, timer_name="Timer Scroll", context={'count': 0}, callback=timer2)

try:
    loop = asyncio.get_event_loop()
    loop.run_forever()
except KeyboardInterrupt:
    t1.cancel()
    t2.cancel()
    print("clean up done")

and the second with an endless loop for checking events ...

while True:
for key, mask in selector.select():
    device = key.fileobj

    for event in device.read():
        ecode = event.code
        if event.type == evdev.ecodes.EV_KEY:
            match event.code:
                case 114:
                    print("Vol down")
                case 115:
                    print("Vol up")

Everything works as it should, but I don't know how to set it up so that everything runs independently without interfering with the other processes.

Best regards



Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation