How to use async with which can be used from python 3.5. PEP 492 -- Coroutines with async and await syntax
async with? An async version of the context manager.
When there is processing like
async with MyDB() as db:
    db.do_something()
If you make MyDB better by executing the code, you can automatically close the db connection when you exit the async with block.
For example, let's create a class that wraps aiomysql and use it.
my_db.py
my_db.py
import asyncio
import aiomysql
class MyDB:
    #Called just before entering async with.
    async def __aenter__(self):
        loop = asyncio.get_event_loop()
        self._connection = await aiomysql.connect(
            host='localhost',
            port=3306,
            user='root',
            password='ultrastrongpassword',
            db='my_db',
            charset='utf8',
            loop=loop
        )
        return self
    #Called immediately after exiting the async with block.
    async def __aexit__(self, exc_type, exc, tb):
        self._connection.close()
    async def fetchall(self, query, args=[]):
        cursor = await self._connection.cursor()
        await cursor.execute(query, args)
        return await cursor.fetchall()
It feels like hiding the connection destination of aiomysql.
The important ones are __aenter__ and __aexit__.
By implementing these, this class can be used as an asynchronous context manager.
The timing of being called is as commented.
As in the example, aenter is a common pattern for setting up resources and aexit for releasing resources.
Let's also look at the caller.
Side to use
from my_db import MyDB
class Hoge:
    async def call_db(self):
        # db = MyDB()
        # async with db:
        #But it has the same meaning.
        async with MyDB() as db:
            result = db.fetchall('select * from some_table')
        print(result)
ʻAsync with [asynchronous context manager instance] as [return value of aenter]:` format.
When this is executed, the processing will proceed in the following order.
__init__ is called)__aenter__ is called__aenter__ in MyDB is stored in db of ʻasync with as db`result = db.fetchall ('select * from some_table') is executed__aexit__ is calledIt feels like a db instance can use all its resources only when it's in async with.
ʻAsync with MyDB.connect () as db: `when you want to
Side to use
class Hoge:
    async def call_db(self):
        async with MyDB.connect() as db:
            result = db.fetchall('select * from some_table')
        print(result)
Just implement the connect method as you normally would:
my_db.py
class MyDB:
    __slots__ = ['_connection']
    async def __aenter__(self):
        (Abbreviation)
    async def __aexit__(self, exc_type, exc, tb):
        self._connection.close()
    async def fetchall(self, query, args=[]):
        (Abbreviation)
    @classmethod
    def connect(cls):
        return cls()
All you need is an instance of MyDB (asynchronous context manager) after async with.
Side to use
class Hoge:
    async def call_db(self):
        async with MyDB.direct_fetch('select * from some_table') as rows:
            print(rows)
In this case, the return value of MyDB.direct_fetch in async with is unlikely to be in the MyDB instance, so you need to think a little. Get aiohttp may be helpful.
It can be handled by creating a private context manager class as shown below.
my_db.py
class _QueryContextManager:
    def __init__(self, coro):
        self._coro = coro
    async def __aenter__(self):
        loop = asyncio.get_event_loop()
        self._connection = await aiomysql.connect(
            host='localhost',
            port=3306,
            user='root',
            password='ultrastrongpassword',
            db='my_db',
            charset='utf8',
            loop=loop
        )
        return await self._coro(self._connection)
    async def __aexit__(self, exc_type, exc, tb):
        self._connection.close()
class MyDB:
    __slots__ = ['_connection']
    @classmethod
    def direct_fetch(cls, query, args=[]):
        async def fetch(connection):
            cursor = await connection.cursor()
            await cursor.execute(query, args)
            return await cursor.fetchall()
        #Returns a private context manager.
        return _QueryContextManager(coro=fetch)
Hmm. I can't deny the feeling of force. .. There may be a better way to write it.
In aiohttp, the connection can be closed from the instance of the return value of request (self._resp.release ()), so it seems to be a little cleaner.
Recommended Posts