稼働中

Raspberry Pi Pico(s_25)uasyncio の動作例 Lock

uasyncio.Lock()

Raspberry Pi PicoのMicroPythonにはmodule’uasyncio’があります。
uasyncioを使うと非同期動作するそうです。並行処理ができるようです。
uasyncioを使ってIRセンサー FC51の検知動作を確認します。
IRセンサー FC51の外観、接続例は記事d_28を参照ください。

記事s_23では、Sens()でIRセンサーの常時検出、LED1()常時点滅、LED2()検出後5回点滅動作を並行処理していました。センサーの検出はフラグで認識していました。そのためLED2()処理時もSens()動作が行われていました。
module’uasyncio’にあるclass ‘Lock’でSens()とLED2()をLockすればLED2()処理時にSens()動作をブロックできそうです。
素人なので詳しくはわかりません。わかる範囲でLockの動作を確認してみようと思います。
※開発環境はThonnyです。ThonnyでMicroPythonをRaspberry Pi Pico with RP2040にインストールして使っています。

class ‘Lock’

class ‘Lock’にはlocked、release、acquireのメソッドがあります。
lock = uasyncio.Lock()で初期化した場合の例になります。
■lock.acquire()
ロックを取得します。ロックを取得して実行されるコルーチンは、最初にロックの待ち受けをしたコルーチンです。
■lock.release()
ロックを開放します。
■lock.locked()
ロック状態をTrue/Falseで返します。

Lock 動作

Lock動作の雰囲気だけ確認します。
単純に以下の時間を脳待機するだけの関数aaa、bbb、cccとします。
aaa 2sec 待機
bbb 1sec 待機
ccc 0.4sec 3回繰り返し
「aaa > bbb > ccc」で進行するものとします。

すべてをawait uasyncio.sleep(?)で処理すると、並行処理されるので
aaaは2sec後、bbbは1sec後、cccは1.2秒後に終了します。
終わる順番はbbb、ccc、aaaになります。
await uasyncio.sleep

上記のbbbとcccにロック処理すると、bbbが開始されるとcccをブロックします。cccの実行は1sec後bbbが終了した後になります。結果、cccの終了は開始から1.0+1.2≒2.2sec後になります。aaaはロックされていないので2sec後に終了します。
終わる順番はbbb、aaa、cccになります。
uasyncio.Lock

以下のスクリプトで動作を確認しました


#lock_test_02b.py
from machine import Pin
import time
import uasyncio

async def aaa():
    global st
    et=uasyncio.ticks()  
    print('aaa>>>>>>>>>>>>>>>>',et-st)
    await uasyncio.sleep(2)
    et=uasyncio.ticks()  
    print('>>>>>>>>>>>>>>>>aaa',et-st)

async def bbb(lock):
    global st
    await lock.acquire()    #ロック取得
    et=uasyncio.ticks()
    print('bbb>>>>>>>>>>>>>>>>',et-st)
    await uasyncio.sleep(1)
    et=uasyncio.ticks()  
    print('>>>>>>>>>>>>>>>>bbb',et-st)
    lock.release()          #ロック解放
    print('release>>>>>>>>>bbb',et-st)

async def ccc(lock):
    global st
    await lock.acquire()
    for i in range(3):
        et=uasyncio.ticks()  
        print('ccc>>>>>>>>>>>>>>>>',et-st)
        await uasyncio.sleep(0.4)
        et=uasyncio.ticks()  
        print('>>>>>>>>>>>>>>>>ccc',et-st)
    lock.release()
    print('release>>>>>>>>>ccc',et-st)

async def main():
    global st
    st=uasyncio.ticks()
    # create a shared lock 共有ロックを作成する
    lock = uasyncio.Lock()
    task1 = uasyncio.create_task(aaa())
    task2 = uasyncio.create_task(bbb(lock)) #ロック使用を設定
    task3 = uasyncio.create_task(ccc(lock)) #ロック使用を設定
    await task1 
    await task2
    await task3
    
uasyncio.run(main())

実行すると以下のようになります。右端の数値は開始からの経過時間です。
なんとなくLock動作がわかったような気がします。


>>> %Run -c $EDITOR_CONTENT
aaa>>>>>>>>>>>>>>>> 15
bbb>>>>>>>>>>>>>>>> 16
>>>>>>>>>>>>>>>>bbb 1016
release>>>>>>>>>bbb 1016    # bbb終了 約1sec後
ccc>>>>>>>>>>>>>>>> 1017
>>>>>>>>>>>>>>>>ccc 1417
ccc>>>>>>>>>>>>>>>> 1417
>>>>>>>>>>>>>>>>ccc 1818
ccc>>>>>>>>>>>>>>>> 1818
>>>>>>>>>>>>>>>>aaa 2016    # aaa終了 約2sec後
>>>>>>>>>>>>>>>>ccc 2218
release>>>>>>>>>ccc 2218    # ccc終了 約2.2sec後
>>> 

スクリプト

Lockの動作がなんとなくわかったので、
記事d_29のSens():IRセンサーの常時検出、LED2():検出後5回点滅動作にロックを設定してLED2()処理時にSens()動作をブロックするように変更します。LED2()処理時もLED1()は並行動作します。
スクリプトは以下のようにしました。


led_while_lock_01b.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from machine import Pin
import time
import uasyncio

#OUT Pin
led1=Pin(22, Pin.OUT, value=0)  #R-LED(常時点滅)
led2=Pin(26, Pin.OUT, value=0)  #Y-LED(検出点滅)
#led1.value(0,1) led1(0,1) led1.high(),led1.low()

#FC-51 IR Detector Pin
dpin=Pin(27, Pin.IN, Pin.PULL_DOWN)  # 検出 dpin='L'

#LED1は常時点滅       
async def LED1():
    global detect
    print('LED1------')
    while True:
        led1.high()
        print('LED1  on')
        await uasyncio.sleep(0.2)
        led1.low()
        await uasyncio.sleep(0.2)
        print('------LED1 ',detect)


#FC-61常時監視 検出したらdetect=1 
async def Sens(lock):
    global detect               # detect:検出フラグ
    print('Sens------')
    while True:
        await lock.acquire()    #ロック取得
        if dpin.value()==0:
            print('detect')
            detect=1            
        print('------Check',detect) 
        await uasyncio.sleep(0.5)
        lock.release()#ロック解放

#検出後の処理  led2点滅 5回
async def LED2(lock):
    global detect               # detect:検出フラグ
    print('LED2------')
    while True:        
        if detect==1:
            await lock.acquire()#ロック取得
            for i in range(5):
                print(f'LED2  blink({i})')
                led2.high()
                await uasyncio.sleep(0.2)
                led2.low()
                await uasyncio.sleep(0.2)
            print('------LED2 reset')
            detect=0
            lock.release()#ロック解放
        else:
            print('LED2 wait 1sec')
            await uasyncio.sleep(1)
        print('------LED2 ',detect)

#非同期処理
async def main():
    global detect               # detect:検出フラグ
    detect=0
    
    lock = uasyncio.Lock()                  #共有ロックを作成
    task1 = uasyncio.create_task(LED1())
    task2 = uasyncio.create_task(LED2(lock))#ロック使用を設定
    task3 = uasyncio.create_task(Sens(lock))#ロック使用を設定

    await task1
    await task2
    await task3

uasyncio.run(main())

実行結果

結果は以下のようになりました。※Thonnyのshellに表示されます。
出力表示だけなのでわかり難いですが、task2、task3がLock処理されています。


>>> %Run -c $EDITOR_CONTENT
LED1------
LED1  on
LED2------
LED2 wait 1sec
Sens------
------Check 0
------LED1  0
LED1  on
detect          #検知
------Check 1
------LED1  1
LED1  on
------LED2  1   #LED2点滅 task2
LED2  blink(0)  #LED2点滅終わるまでSens(task3)はブロック
------LED1  1   #LED1(task1)は並行動作している
LED1  on
LED2  blink(1)
------LED1  1
LED1  on
LED2  blink(2)
------LED1  1
LED1  on
LED2  blink(3)
------LED1  1
LED1  on
LED2  blink(4)
------LED1  1
LED1  on
------LED2 reset       #LED2点滅終わり
------LED2  0
LED2 wait 1sec
------Check 0
省略
KeyboardInterrupt: 
>>> 

まとめ

uasyncioのclass ‘Lock’の使い方が少しわかったような気がします。