Sanic允许您使用流方法将内容流式传输到客户端。此方法接受对应的回调函数,该函数传递一个写入的StreamingHTTPResponse对象。

from sanic import Sanic
from sanic.response import stream

app = Sanic(__name__)

@app.route("/")
async def test(request):
    async def sample_streaming_fn(response):
        response.write('foo,')
        response.write('bar')

    return stream(sample_streaming_fn, content_type='text/csv')

这在您想将内容流传输到外部服务(如数据库)的客户端的情况下非常有用。例如,您可以使用asyncpg提供的异步游标将数据库记录流式传输到客户端:

@app.route("/")
async def index(request):
    async def stream_from_db(response):
        conn = await asyncpg.connect(database='test')
        async with conn.transaction():
            async for record in conn.cursor('SELECT generate_series(0, 10)'):
                response.write(record[0])

    return stream(stream_from_db)

results matching ""

    No results matching ""