domingo, 10 de agosto de 2014

Asyncio e corotinas

Continuando a série sobre o módulo asyncio do Python 3.4, vou apresentar as corotinas e como elas simplificam a escrita de nossos programas com o loop de eventos. Com a saída do Python 3.4, eu atualizei o livro de Introdução à Programação com Python. Alguns assuntos fogem ao escopo do livro que é destinado a iniciantes. Eu estou continuando uma série de posts curtos sobre alguns tópicos que acho interessantes e quem sabe até podem virar base para um novo livro. Clique aqui para ler o primeiro artigo.

No artigo anterior, apresentamos uma chamada ao loop de eventos bem simples:
import asyncio

def print_and_repeat(loop):
    print('Hello World')
    loop.call_later(2, print_and_repeat, loop)

loop = asyncio.get_event_loop()
loop.call_soon(print_and_repeat, loop)
loop.run_forever()

Vejamos como reescrever este exemplo simples usando corotinas:
import asyncio

@asyncio.coroutine
def print_and_repeat(loop):
    while True:
        print('Hello World')
        yield from asyncio.sleep(2)

loop = asyncio.get_event_loop()
try:
 loop.run_until_complete(print_and_repeat(loop))
finally:
 loop.close()

Este exemplo foi extraído da documentação do Python, vamos ver o que mudou. A principal mudança no início do programa é o uso do decorador @asyncio.coroutine. Este decorador transforma nossa função em uma corotina e permite a utilização do yield from, como definido na PEP380. Veja que o cabeçalho da função não foi alterado, mas que substituímos a chamada de loop.call_later pela combinação de um while True com um yield from no final. Uma corotina pode ser suspensa e esperar o processamento de uma outra corotina. No caso, asyncio.sleep(2) é uma corotina do módulo asyncio que suspende a execução da função pelo número de segundos passados como parâmetro. Na realidade, esta chamada retorna uma corotina que é marcada como completa após os 2 segundos. Isto pode ser realizado pois no yield from, criamos a nova corotina e indicamos ao loop que não continue a executar print_and_repeat até que a nova corotina esteja concluída. A partir deste ponto, a execução volta ao loop de eventos que monitora a conclusão da nova corotina criada, suspendendo a execução da anterior. Uma vez que o a corotina do sleep é concluída, após 2 segundos, o loop reativa a chamada suspensa de print_and_repeat e a execução continua, voltando para o while. Parece complicado, mas veja como ficou fácil de escrevermos a função. Fica bem mais claro nossa intenção de realizar uma repetição do print('Hello World') a cada 2 segundos.

Modificamos também a chamada de execução da corotina, pois agora utilizamos loop.run_until_complete para iniciar nossa corotina principal. Aproveitamos para colocar tudo entre um try...finally para terminar a execução do loop corretamente (mesmo em caso de exceção). Perceba que no exemplo anterior, com call_soon, passamos a função e seus parâmetros, mas não executamos a função em si. No caso de run_until_complete, estamos passando o retorno da chamada de print_and_repeat que é uma corotina, uma vez que a marcamos com o decorador @asyncio.coroutine.

No post anterior comparamos a velocidade de execução entre as várias formas de se executar código em paralelo com Python. Agora veremos como usar o asyncio para criar uma aplicação prática, como um cliente e um servidor TCP/IP, mas indo além dos exemplos da documentação do Python. É preciso lembrar que o módulo asyncio ainda é muito novo e que tanto a documentação quanto a implementação de algumas funcionalidades ainda estão sendo alteradas.

Vamos começar pelo servidor. Um servidor TCP/IP é um exemplo clássico de programa chato a escrever. Normalmente, você pode escolher utilizar threads ou se aventurar com select e chamadas não bloqueantes para gerenciar várias conexões. Este problema se agrava em aplicações mais complexas, onde algum processamento precisa ser realizado antes de se gerar a resposta, por exemplo, a um comando do usuário. Usando o módulo asyncio, esta tarefa fica bem mais fácil. Primeiro, porque o tratamento de dados é gerenciado por uma classe, responsável pelo protocolo. Esta classe traz métodos que são chamados em situações comuns ao programarmos um servidor TCP/IP, como chegada de uma nova conexão, desconexão, chegada de dados para leitura entre outras. Além disso, o asyncio também traz classes especializadas em quebrar os dados em linhas, o que facilita a implementação de protocolos com comandos em formato texto, terminados por enter (CR).

O servidor é controlado por uma classe chamada EchoServer, pois o desenvolvi a partir do servidor de Echo dado como exemplo na documentação, mas com alguns detalhes que observei no código do módulo asyncio. O protocolo implementado é bem simples, a cada linha, a data e hora atuais são enviadas. Se o cliente enviar sair a conexão é terminada. Vamos ver o programa completo e discutir parte por parte.

import asyncio
import time
from common import *

class EchoServer(asyncio.streams.FlowControlMixin):    
    ativas = 0
    def connection_made(self, transporte):        
        peername = transporte.get_extra_info('peername')
        print('Conexão de {}'.format(peername))
        EchoServer.ativas+=1
        print("Conexões ativas: {}".format(EchoServer.ativas))
        self.transporte = transporte
        self.leitor = asyncio.StreamReader()
        self.leitor.set_transport(self.transporte)
        self.escritor = asyncio.StreamWriter(transport=self.transporte, protocol=self, reader=self.leitor, loop=asyncio.get_event_loop())        
        asyncio.async(self.gerencia())
    
    @asyncio.coroutine
    def gerencia(self):        
        while True:
            dados = yield from self.leitor.readline()                        
            self.escritor.write(strToByte(time.strftime("%c")+"\r\n"))            
            yield from self.escritor.drain()
            comando = byteToStr(dados).strip().lower()
            if(comando == "sair"):
                self.transporte.close()
                return

    def connection_lost(self, exp):                
        EchoServer.ativas-=1
        print("Conexões ativas: {}".format(EchoServer.ativas))
        super().connection_lost(exp)

    def data_received(self, dados):                
        print('data received: {0}'.format(byteToHex(dados)))
        print('     received: {0}'.format(strPrintable(dados)))
        print('       string: {0}'.format(byteToStr(dados)))
        self.leitor.feed_data(dados)
        

loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, '127.0.0.1', 8888)
servidor = loop.run_until_complete(coro)
print('Escutando {}'.format(servidor.sockets[0].getsockname()))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print("exit")
finally:
    servidor.close()    
    loop.close()

A classe EchoServer herda de uma classe fornecida em asyncio.streams, chamada FlowControlMixin. A classe FlowControlMixin é por sua vez derivada de Protocols, também fornecida pelo módulo asyncio. A ideia desta classe é implementar um protocol factory, ou seja, um construtor de instâncias responsáveis pela implementação da gestão de cada nova conexão. Um protocolo normal, precisa herdar apenas de Protocols, mas para utilizar alguns métodos para leitura buferizada de linhas, especialmente o write.drain que veremos logo após, a implementação contida em FlowControlMixin é interessante.

Aproveitamos a nova classe para contar o número de conexões ativas. Cada nova conexão recebida por nosso servidor TCP/IP chama o construtor de nossa classe e o método connection_made, passando o transporte (entenda como o socket já conectado) como parâmetro.

Utilizando o parâmetro transporte, chamamos o método get_extra_info('peername') para obter o endereço do cliente que acabou de se conectar ao servidor. Logo em seguida, incrementamos o número de conexões. Veja que como o código que roda no loop de eventos não é multi-threaded, não precisamos de locks ou de outros mecanismos de controle, já que apenas uma função roda a cada vez. O resto do método connection_made prepara as instâncias do leitor e do escritor, objetos das classes StreamReader e StreamWriter respectivamente. Estes objetos vão fornecer corotinas úteis para ler e escrever os dados de forma não bloqueante. Veja que passamos transporte tanto para o escritor quanto para o leitor e que uma série de parâmetros são necessários para a inicialização do escritor.

No fim de connection_made, usamos a função asyncio.async para iniciar o processamento da corotina self.gerencia, dentro do loop de eventos. Veja que o método gerencia foi marcado com o decorador @asyncio.coroutine.

O método gerencia contém uma estrutura de repetição while que espera uma linha do cliente. Veja que utilizamos yield from para suspender a execução de gerencia enquanto self.leitor.readline() não terminar. Neste ponto, a execução volta para o loop de eventos e retorna apenas quando self.leitor.readline() contém uma linha enviada pelo cliente ou caso uma exceção tenha ocorrido. O uso do yield from é fundamental, pois caso o readline() esperasse o cliente enviar a linha para continuar a execução, todo o loop de eventos seria bloqueado. Como o uso do yield from, a execução volta para o loop que é livre para executar outros métodos e outras corotinas. O objetivo é não fazer o computador esperar por dados ou resultados que demoram muito tempo (ou um tempo desconhecido, possivelmente longo, para retornar). Outra característica de yield from é que o resultado do self.leitor.readline() é retornado e no caso, armazenado na variável dados.

A execução segue normalmente e nosso servidor envia a hora e a data atual, veja que uma linha foi acrescentada ao final da string. Este fim de linha é importante, pois como nosso protocolo é em formato texto e orientado a linhas, esperamos o enter (CR) para processar o comando ou a resposta.
Depois de escrever a resposta, usamos self.escritor.drain() que é uma outra corotina. Esta nova corotina não completa até o que o buffer de escrita seja enviado. Desta forma, podemos garantir que os dados foram enviados (ainda que não possamos ter certeza se estes foram recebidos pelo cliente) antes de continuarmos. Como usamos yield from com esta corotina, a execução é suspensa até que o drain seja completado.

Como usamos Python 3.4, os dados são do tipo byte e não string. A função byteToStr converte de bytes para string, usando a codificação UTF-8. Esta função será apresentada no programa common.py, compartilhando rotinas úteis tanto para nosso cliente quanto para nosso servidor. Para facilitar o processamento de comandos, retiramos os espaços em branco do início e fim do comando, inclusive enter (CR) e LF, e convertemos o resultado para minúsculas com lower. Se o comando for igual a "sair", chamamos o método close de self.transporte para encerarmos a conexão. Veja que ao fecharmos a conexão, finalmente retornamos como em uma função normal, utilizando return e terminando assim a execução de nossa corotina gerencia.

O método connection_lost é chamado quando a desconexão do cliente é detectada. O número de conexões ativas é decrementado e o método connection_lost da superclasse é chamado. O parâmetro exp contém None caso seja uma desconexão normal ou a exceção em caso de erro. Neste exemplo não estamos tratando os possíveis erros para nos concentrarmos no asyncio.

Já o método data_received é chamado sempre que dados forem recebidos pelo transporte. Os dados recebidos são passados como parâmetro (dados). Aqui, incluí algumas funções de debug que exibem os dados em formato hexadecimal, string e UTF-8. Estas funções são necessárias para verificarmos se os dados estão chegando no formato esperado. Você pode executar um teste com um programa de telnet clássico, como o Putty no Windows, mas não esqueça de desativar a opção de negociação do protocolo, para evitar que comandos que não interpretamos sejam enviados, ou simplesmente, teste com o cliente que é apresentado logo abaixo.

Um detalhe muito importante de data_received é a chamada do método self.leitor.feed_data, que envia os dados para o leitor, responsável por quebrar os dados em linhas.

Em nosso programa principal, obtemos o loop de eventos com get_event_loop() e criamos uma corotina que inicializa nosso servidor com loop.create_server. Em create_server, informamos o endereço que nosso servidor irá escutar (ip e porta). Veja que a classe EchoServer foi passada como protocol factory.

Ao chamarmos loop.run_until_complete(coro), o loop de eventos roda até que a corotina criada pelo create_server termine, retornando um objeto servidor, utilizado para parar o servidor e para ter acesso a todas as conexões, mas isso fica para outro post.

Chamamos loop.run_forever() para ativar nosso servidor. Para desativá-lo, digite CTRL+C.

Neste ponto, o endereço 127.0.0.1, porta 8888 estará recebendo conexões. Quando uma conexão for recebida, uma nova instância de EchoServer será criada. Ao se estabelecer a conexão o método connection_made será chamado e ativará uma corotina gerencia para gerenciar a recepção e o envio de linhas de comandos. O método data_received é chamado sempre que novos dados forem recebidos (seja uma linha completa ou não). O método connection_lost é chamado quando o cliente se desconectar.

Vejamos o código fonte de common.py:

import string

def byteToHex(data, sep=" "):
    return sep.join("{0:02X}".format(x) for x in data)

def strPrintable(data, sep=" "):
    return sep.join("{0:2s}".format(chr(s) if chr(s) in string.printable and s>30 else ".") for s in data)

def strToByte(s, encoding ="utf-8"):
    return s.encode(encoding)

def byteToStr(data, encoding ="utf-8"):
    return data.decode(encoding, errors="replace")

E o código fonte de nosso cliente.py:

import asyncio
import time
from common import *

class EchoClient(asyncio.streams.FlowControlMixin):        
    def connection_made(self, transporte):        
        peername = transporte.get_extra_info('peername')
        print('Conectado à {}'.format(peername))                
        self.transporte = transporte
        self.leitor = asyncio.StreamReader()
        self.leitor.set_transport(self.transporte)
        self.escritor = asyncio.StreamWriter(transport=self.transporte, protocol=self, reader=self.leitor, loop=asyncio.get_event_loop())        
        asyncio.async(self.gerencia())
        self.feito = asyncio.Future()
    
    @asyncio.coroutine
    def gerencia(self):        
        for x in range(10):                        
            self.escritor.write(strToByte("Alô\r\n"))
            yield from self.escritor.drain()

            dados = yield from self.leitor.readline()            
        self.escritor.write(strToByte("sair\r\n"))
        yield from self.escritor.drain()                                         
        self.transporte.close()
        self.feito.set_result(True)

    def connection_lost(self, exp):        
        print("Conexão perdida")
        super().connection_lost(exp)

    def data_received(self, dados):                
        print('dados recebidos: {0}'.format(byteToHex(dados)))
        print('      recebidos: {0}'.format(strPrintable(dados)))
        print('         string: {0}'.format(byteToStr(dados)))
        self.leitor.feed_data(dados)

        
loop = asyncio.get_event_loop()
coro = loop.create_connection(EchoClient, '127.0.0.1', 8888)
transporte, protocolo = loop.run_until_complete(coro)

try:
    loop.run_until_complete(protocolo.feito)
except KeyboardInterrupt:
    pass
finally:    
    loop.close()

Execute o servidor e depois o cliente, cada em uma janela ou terminal diferente. Veja que o cliente termina sua execução após enviar 10 vezes o comando Alô e sair. Execute várias vezes o cliente e veja que o servidor continua ativo. Experimente aumentar o número de comandos de 10 para 100 no cliente e reexecute. Tente executar a partir de uma terceira janela outro cliente simultaneamente. Observe que conseguimos implementar um cliente e um servidor TCP/IP em um pouco mais de 100 linhas de código em Python. Um servidor capaz de atender vários clientes sem utilizar múltiplos threads. Você pode comentar ou remover os prints que não precisar, eles servem apenas para debugar.

O código do cliente é muito parecido com o código do servidor. A principal mudança é o método gerencia e o atributo self.feito, criado com asyncio.Future(). Vejamos a criação da instância de nosso cliente. A função create_conection é na realidade uma corotina que recebe o protocolo (no caso EchoClient), o ip e a porta do servidor. Ao chamarmos loop.run_until_complete(coro), uma tupla com o transporte e o protocolo é retornada. Este retorno é importante, pois precisamos ter acesso a instância de EchoClient criada para gerenciar nossa conexão, no caso protocolo. Com a instância de EchoClient retornada em protocolo, podemos rodar o loop até que feito seja marcada como finalizada: loop.run_until_complete(protocolo.feito). Esta etapa é importante, pois devemos executar o loop até que o cliente tenha tempo para terminar seu trabalho. Veja que no final de gerencia, marcamos self.feito como concluída: self.feito.set_result(True).

Se você quiser testar o servidor com vários clientes simultaneamente, adicione a seguinte função ao código de cliente.py e modifique o programa principal para:

@asyncio.coroutine
def roda_varias(vezes):
    pendente = []
    for t in range(vezes):
        pendente.append(asyncio.async(loop.create_connection(EchoClient, '127.0.0.1', 8888)))
    for y in pendente:            
        transporte, protocolo = yield from y
        yield from asyncio.wait_for(protocolo.feito, None)

loop = asyncio.get_event_loop()
coro = loop.create_connection(EchoClient, '127.0.0.1', 8888)
client = loop.run_until_complete(coro)

try:
    loop.run_until_complete(roda_varias(100))
except KeyboardInterrupt:
    pass
finally:    
    loop.close()

Você também pode utilizar start_server e open_connection para receber diretamente uma tupla com StreamReader e StreamWriter, mas estes exemplos você pode encontrar na documentação do Python.

No próximo artigo, uma nova classe comum será usada para gerenciar os protocolos e outra forma de instanciação será passada para realizar uma comunicação entre vários clientes.



Nenhum comentário: