Olá no último vídeo estávamos fazendo o desacoplamento do do R Zone de Best practices fazendo com que pipeline de consulta e o pipeline de processamento estejam desacoplados cada um e o pipeline Esse foi o resultado que conseguimos por enquanto temos esse pipeline e esse pipeline que são justamente os pipeline de consulta e de processamento no pipeline de processamento falta um uma medida que é para poder enviar o payload de entrada para o pipeline de reprocessamento aqui no Exception nós temos um get input pipeline input que para ser inserido na base de dados temporária de reprocessamento
entretanto nós não configuramos esse pipeline input ainda E é isso que vamos fazer nesse momento então aqui na no meu pipeline principal eu vou criar um um mock vou utilizar um mock para fazer isso e o session para poder colocar esse elemento dentro do pipeline input então aqui eu vou cortar Vou salvar e depois eu continuo para dentro no meu do meu processamento aqui dentro eu vou colocar agrupar pipeline input nós vamos criar esse objeto aqui então aqui temos peline input salvando toda a mensagem que cheg cheguei presente mess de ponto cifrão logo depois temos
um session que vai salvar esse pipeline input pipeline copiar esse valor tirar esses campos não são correspondentes colar o input salvar agora a gente garante que inut sendo salvo exception a gente vai conseguir enviar esse registro para reprocessamento entretanto ao fazer isso eu tenho que eu acabo quebrando esse grupo Person info aqui então precisa mudar ele para que ele pegue agora não message pon cifrão mas message ponto pipeline input pon cifrão uma vez que a gente colocou o pipeline input como um objeto que carrega toda a mensagem e apenas Essa alteração é o suficiente para
fazer com Que ele continue normalmente como estava fazendo anteriormente Então vou salvar o pipeline e aqui por enquanto a gente já finaliza as atrações no processamento o que a gente precisa fazer agora é começar a criar o pipeline de reprocessamento e o pipeline de evento de erro então novamente por enquanto estamos com dois pipelines que são os pipelines de consulta e o pipeline agora vamos criar um terceiro pipeline que é o pipeline de reprocessamento genérico da plataforma com o novo Canvas de novo pipeline a primeira coisa que eu vou fazer configurar para um Trigger de
sched a cada 5 minutos por exemplo e aqui a gente vai começar puxando a nossa base de dados e vimos estamos salvando né no pipeline de processamento e a gente está utilizando o Cassandra para fazer isso vou buscar o Cassandra vou conectar me trig vou iniciar a configuração aqui a gente vai fazer uma operação WS Cassandra vai select vamos pegar todos os dados aqui o conne string serur esse caso eu vou mudar o step osos a tabela para que seja a tabela correspondente ao pipeline de reprocessamento a tabela de reprocessamento aqui na Carry Justamente esse
valor aqui nós vamos buscar todos os registros nessa tabela de reprocessamento e aqui a gente precisa fazer um filtro que vai ser por enquanto o air pipeline fo igual a test mode por enquanto porque estamos fazendo um desenvolvimento mas esse valor aqui se torna o valor dinâmico do nome do pipeline por exemplo colocamos um fing que consigamos fazer esse tipo de filtro aqui em que o pipeline tem um certo nome contrrio el não conseguir fazer ess filtro Então vou confirmar e aqui eu V iniciar a verifica se deu sucesso F se troue registros ou não
por meio do Choice vou buscar conector choice e imediatamente seguindo as boas práticas eu também vou buscar o conector log para acessar juntamente com Choice então na primeira condição do Choice vamos configurar como caso de não retorn a registros e a verificação vai ser aqui dentro Eu vou procurar para uma chave chamada Row count Row count e quando ela for exatamente igual a zero significa que não retornou registros vou copiar esse step n porque eu vou colocar meu log com o mesmo nome então não retornou registros log não retorn registros ele é um log level
info não tem problema em não retornar nada só significa que não tem erro a serem reprocessados Vou confirmar e agora vou iniciar a configuração da minha segunda condição e é quando retornou a registros Então vou copiar e colar esse log aqui vou gerar minha segunda condição que será um other Rise então aqui vai ser retornou dados retornou registros e aqui o tipo vai ser a riser visto que eu já fiz a condição na condição de cima novamente o log descritivo também é info e aqui ele retornou confirmei vou reorganizar meu pipeline e aqui temos dois
caminhos o caminho que não retorna a registros não tem nada se fazer a gente pode parar a execução aqui porque ele tá simplesmente sem dados para serem reprocessados eu vou simplesmente falar que não existem dados para serem reprocessados con de Nito Então vamos criar aqui uma saída vou chamar Deão de saída por exemplo pode ser tratamento de output pode ser o nome e o desenvolvedor achar adequado e aqui eu vou colocar o atributo como mensagem mensagem vai ser não há registros serem reprocessados não há registros para serem reprocessados Vou confirmar e aqui caso eu teste
essee pipeline deira tá como vazio aqui ele tem que cair no caminho de cima aparentemente ele está caindo no de baixo se isso tá acontecendo é porque o meu componente Cassandra ele não está retornando rout ele tá retornando apenas daa então que a gente vai ter que mudar a condição para que a gente consiga entender como é que vamos fazer essa verificação a nova verificação funcionou da seguinte forma aqui nós verificamos na primeira posição desse elemento se possui um atributo chamado error caso possua significa que nós temos registro então aqui nós vamos negar essa condição
com o interrogação então novamente caso dentro do Array deira na posição dera na posição zero possua um atributo de erro significa que existem dados nesse momento a gente vai negar então não existem dados com negativo de existem dados E aí ele vai chegar para condição de cima então confirmando e executando quando não há dados ele não vai retornar não vai encontrar aquele atributo error E aí ele iria no caminho de cima por exemplo agora que a gente já consegue fazer essa condição normalmente Vamos pro caminho de baixo e aqui a gente vai fazer um forit
para poder fazer o processamento de cada um desses erros especificamente então forit erro por exemplo aqui vamos rar o arraya vou confirmar vamos começar as configurações aqui dentro assim como no caso anterior eu vou colar todos esses conectores e vou esp passar um por um explicando o que está acontecendo Então quando chega algum erro vindo do do pipeline de processamento ele vai passar pelo Cassandra e o Cassandra vai retornar diversos erros e vai ser iterado pel por esse forid um ponto importante é que o Cassandra ele vai entregar os atributos que foram inseridos em formato
de string então a gente precisa transformar esse string em objeto de Z com esse conector aqui a gente vai transformar o pad em um objeto agora com pad já em objeto nós vamos fazer o increase r Number A gente vai pegar esse valor aqui vai acrescentar vai incrementar em um toda vez que entrar no no looping do forid para dizer que ele já está tentando uma nova tentativa logo após fazer o incremento do R Number para que esse processamento não fique infinito nós vamos colocar aqui no nosso pad a execution Key desse registro aí depois
que a gente faz o processamento novamente nós vamos poder pegar esse execution Key e fazer a deleção do Cassandra então nós colocamos o execution Key na sessão entramos no bloco de verificação de R tentativa então aqui temos o caso de caso o número de tentativa seja maior do que o limite maior ou igual então ele vai simplesmente publicar um evento de erro que a gente vai construir caso contrário a gente vai reprocessar o pipeline por meio do pipeline Executor ou por meio de um evento eh que corresponde ao evento do pipeline publisher né então aqui
nos dois casos caso o número de tentativas ainda não tenha atingido o limite enviamos para reprocessamento caso contrário a gente publica um erro mas aqui como estamos dentro de um Block execution as duas saídas vão concatenar nesse session a gente vai buscar essa mesma chave e assim a gente vai poder fazer a deleção desse registro uma vez que ele já foi processado para deletar esse registro a gente utiliza o candra novamente e a gente utiliza mesma tabela de reprocessamento com delete onde a execution Key execution Key que está presente no payload que a gente busc
logo apó a gente faz uma verificação se ação acorreu de maneira adequada e coloca um set sucess caso tenha corrido tudo da maneira desejada Então vou salvar ess pipeline vou colocar o nome dele como sched reprocessamento grupo v o def e o último passo para configurar esse pipeline é colocar um log no Exception e colocar um tratamento de erro para esse forit aqui um de saída para esse forit então aqui vamos entrar no desse pipeline colocar um log e aqui l a gente não vai configurar mais tratamento de erro Vamos só colocar log de erro
e o step name vai ser erro no for do reprocessamento essa mensagem colar aqui também nesse caso específico eu vou poder utilizar toda mensagem para entender o que aconteceu diretamente pela aa de monitoramento confirmando agora a gente vai vir vai criar um dies de saída também depois do dies generator eu vou fazer o seguinte nessa mensagem antes da mensagem eu vou colocar um atributo chamado [Música] sumário vai receber message P cifrão que é justamente o resultado dos summarizes do forit então aqui dentro desse sumário vai ter total success failed e a mensagem vai ser os
registros foram enviadas para reprocessamento aqui eu tô acabei colocando um s os registros foram enviados para reprocessamento vou confirmar eu vou salvar esse pipeline novamente e aqui a gente termina a construção de reprocessamento próximo passo agora é criar o evento de erro nesse evento de erro nós vamos associar os erros que acontecem no scheduler de consulta e vamos associar também o erro que acontece aqui dentro a gente não vai reprisar mais mais registro então vamos publicar um evento de erro então nos vemos no próximo vídeo onde nós vamos construir o evento de erro e vamos
fazer essa alteração nos outros pipelines