Desde que o Deno KV foi lançado, a equipe do Deno está fazendo um excelente trabalho em adicionar mais funcionalidades ao que poderia ser só um simples banco de dados chave-valor, mas agora ele é muito mais do que isso!
Uma das mais novas introduções à caixa de ferramentas do Deno é o uso de filas através do Deno Queues.
Sobre filas
As filas são um conceito muito antigo em programação. A ideia é ter literalmente uma estrutura de dados que te entrega um item de cada vez no modelo FIFO (First In First Out). Filas são excelentes quando temos processos que demoram um pouco para acontecer, mas não são importantes e podem ser feitos em background, por exemplo, envios de emails e notificações.
Com o avanço da computação distribuída, filas se transformaram em algo bastante importante porque agora era possível criar uma fila distribuída entre vários sistemas através de uma rede.
As filas poderiam tanto ser 1:1, ou seja, uma mensagem para um único sistema, ou então um modelo pub/sub (publisher/subscriber), que é o mais comum hoje em dia, onde temos uma fila de mensagens que são ouvidas por um ou mais sistemas, sempre que uma nova mensagem chega, esses sistemas são avisados e recebem a mensagem mais recente.
Mensageria com filas se tornou bastante famosa com o uso do Apache Kafka e RabbitMQ ao longo dos anos.
Deno Queues
Com o Deno não foi diferente! A implementação de filas no Deno KV segue o modelo pub/sub. Mas, ao invés de poder ouvir diversas filas (por exemplo, uma fila de emails, outra fila de webhooks), o Deno Queues só tem uma, então o código fica bastante simples:
using db = await Deno.openKv()
db.listenQueue(async (msg) => {
await mandarEmail(msg.from, msg.to, msg.body)
})
await db.enqueue({
from: 'hello@lsantos.dev',
to: 'suporte@formacaots.com.br',
body: 'A Formação TS é animal!'
})
É simples assim... No momento o KV só consegue ouvir uma única fila, mas é mais do que suficiente para aplicações serverless.
O método enqueue
aceita uma outra propriedade chamada delay
que vai atrasar o envio da mensagem naquela quantidade de milissegundos. No exemplo acima, a mensagem seria entregue instantaneamente, mas imagine que estamos enviando algo daqui a uma hora:
using db = await Deno.openKv()
db.listenQueue(async (msg) => {
await mandarEmail(msg.from, msg.to, msg.body)
})
await db.enqueue({
from: 'hello@lsantos.dev',
to: 'suporte@formacaots.com.br',
body: 'A Formação TS é animal!'
}, { delay: 3_600_000 })
At-least-once delivery
Quando estamos tratando de modelos de fila e outros sistemas distribuídos temos o conceito de QoS, ou Quality of Service. Esse conceito dita como nossas mensagens serão entregues, por exemplo, as filas entregam as mensagens na ordem que foram recebidas, mas outras estruturas podem entregar mensagens e eventos fora de ordem.
Porém, para nós aqui, o que importa é a quantidade de vezes que vamos entregar a mensagem. Existem sistemas que não garantem que uma mensagem seja entregue (UDP por exemplo).
No Deno Queues temos a certeza de que a mensagem vai ser entregue pelo menos uma vez e, se a mensagem falhar ao ser entregue, o mesmo handler vai ser chamado múltiplas vezes (até 5 por padrão).
O mesmo acontece se, por algum motivo você tiver uma exceção. Após isso a mensagem vai ser descartada a não ser que você tenha uma DLQ (Dead-letter queue).
Como já estamos na fila, basta que você mande uma segunda opção para o enqueue
chamada keysIfUndelivered
para criar uma forma de validar as mensagens não entregues. Essa opção aceita um array de strings de duas dimensões (string[][]
) que serão as chaves que vão ser setadas se a mensagem falhar, por exemplo:
const user = { id: 123 }
await db.enqueue(user, {
keysIfUndelivered: [['dlq', 'user', user.id]]
})
Se essa mensagem falhar em ser entregue, uma nova chave dlq:user:123
vai ser criada com o conteúdo da mensagem original.
Duplicidade
Um dos problemas que a gente sempre precisa prestar atenção em sistemas distribuídos é a questão da duplicidade.
É esperado que sistemas baseados em mensagens (eventos) possam:
- Receber um evento uma única vez
- Receber um evento múltiplas vezes
- Receber um evento em ordem
- Receber um evento fora de ordem
Então é super importante que exista uma forma de você criar idempotência, ou seja, independente de quantas vezes você enviar o mesmo evento, ele só vai ser executado uma vez, seja essa idempotência gerada através de uma chave (chamada de idenpotency key) ou através da própria lógica, por exemplo, uma operação que seta um valor para 100, sempre vai setar o valor para 100 independente de quantas vezes for chamada.
No próprio artigo de lançamento do queues o time coloca um exemplo interessante usando nonces, que são basicamente chaves de idempotência:
const db = await Deno.openKv()
db.listenQueue(async (msg) => {
const nonce = await db.get(["nonces", msg.nonce])
if (nonce.value === null) return
// Aqui a mensagem ainda não foi processada
await db.atomic()
// Checamos novamente
.check({ key: nonce.key, versionstamp: nonce.versionstamp })
// Apagamos o nonce
.delete(nonce.key)
// Algum processamento
.sum(["processed_count"], 1n)
.commit()
})
// Enviamos a mensagem
await db.enqueue({ nonce: crypto.randomUUID() })
Perceba que estamos verificando a mensagem duas vezes, a primeira para saber se o valor do nonce existe na chave de nonces, se não, isso significa que já processamos a mensagem, se sim, vamos abrir uma transação atômica, checar novamente se a versão corresponde com a versão que estamos para verificar se a chave não sofreu alguma alteração, já que é possível que outro processo possa ter modificado.
Inclusive, misturar as operações atômicas com o Deno Queues é uma ideia super interessante porque abre portas totalmente novas pra que você possa compor aplicações ainda mais complexas.
Momento FTS!
Se você curtiu esse artigo, eu também tenho um curso completo de TypeScript chamado Formação TypeScript!
Eu te convido a dar uma olhadinha lá se você quiser aprender mais sobre TypeScript comigo e a nossa incrível comunidade com centenas de alunos e alunas!