Criar um fluxo de ETL completo utilizando o pgloader

O objetivo desta postagem é colocar em prática a escrita de um fluxo de ETL completo utilizando uma única ferramenta, o pgloader. Vamos aprender como criar um extrator de dados a partir de um csv disponibilizado pela RFB, é o arquivo de CNPJ das empresas.

Para este tutorial, estou supondo que possui uma instância do postgres instalado (existe um passo-a-passo de instalação aqui)

E o pgloader, o pgloader é simples de instalar e pode ser feito via apt-get. Basta seguir o tutorial disponibilizado pelo projeto.

Pgloader é um projeto opensource desenvolvido por Dimitri Fontaine (que é um dos  core contributor do postgres). A versão 3.x é escrita em Common Lisp. Um dos motivos de ter escolhido esta linguagem para escrever o pgloader é o desempenho.

Vamos fazer isso para o emprecsv. Que pode ser baixado aqui.

O arquivo zipado possui 147MB.

Após deszipado possui 575MB.

Ao abrir o arquivo vemos que possui 7.947.277 linhas.

Podemos escrever um script de extração dos dados a partir layout dos dados abertos, quem disponibiliza o layout é a própria receita, fazendo a transcrição do conteúdo do arquivo temos o seguinte layout para o tipo EMPRESAS.

EMPRESAS

Campo Descrição
CNPJ BÁSICO NÚMERO BASE DE INSCRIÇÃO NO CNPJ.
RAZÃO SOCIAL / NOME EMPRESARIAL NOME EMPRESARIAL DA PESSOA JURÍDICA
NATUREZA JURÍDICA CÓDIGO DA NATUREZA JURÍDICA
QUALIFICAÇÃO DO RESPONSÁVEL QUALIFICAÇÃO DA PESSOA FÍSICA RESPONSÁVEL PELA EMPRESA
CAPITAL SOCIAL DA EMPRESA CAPITAL SOCIAL DA EMPRESA
PORTE DA EMPRESA CÓDIGO DO PORTE DA EMPRESA: 00 – NÃO INFORMADO 01 - MICRO EMPRESA 03 - EMPRESA DE PEQUENO PORTE 05 - DEMAIS
ENTE FEDERATIVO RESPONSÁVEL O ENTE FEDERATIVO RESPONSÁVEL É PREENCHIDO PARA OS CASOS DE ÓRGÃOS E ENTIDADES DO GRUPO DE NATUREZA JURÍDICA 1XXX. PARA AS DEMAIS NATUREZAS, ESTE ATRIBUTO FICA EM BRANCO.

Estes dados estão separados por ;.

Para fazer isso vou utilizar o pgloader mas você pode implementar em qualquer linguagem de programação a mesma lógica.

Preparando o banco de dados

Desejamos colocar o conteúdo do arquivo csv para uma sistema de banco de dados relacional, para facilitar a consulta aos dados e agilizar o processamento.

Neste exemplo eu vou utilizaro postgres como sistema de armazenamento. Então você só vai precisar do criar um novo database e uma nova tabela neste banco de dados, esta tabela vai conter todas as colunas que desejamos importar do arquivo disponibilizado pela RFB.

Para este exemplo eu criei um novo database no sgbd chamado cnpjs . Um DDL que representa o conjunto de dados que desejamos importar seria:

CREATE TABLE empresa (
     CNPJ INTEGER PRIMARY KEY,
     RAZAO VARCHAR(150),
     NATUREZAJURIDICA VARCHAR(150),
     QUALIFICRESPONSAVEL VARCHAR(150),
     CAPITALSOCIAL VARCHAR(32),
     PORTEDAEMPRESA VARCHAR(2),
     ENTEFEDERATIVO VARCHAR(64)
 );

Após escrever o arquivo é possível fazer a "carga" dos dados da seguinte forma, este é o script do pgloader:

LOAD CSV
   FROM 'K3241.K03200Y0.D20108.EMPRECSV'
   HAVING FIELDS
   (
        CNPJ,
        RAZAO,
        NATUREZAJURIDICA,
        QUALIFICRESPONSAVEL,
        CAPITALSOCIAL,
        PORTEDAEMPRESA,
        ENTEFEDERATIVO
   )
   INTO postgresql://<usuario>:<senha>@localhost:5432/cnpjs
    TARGET TABLE empresa
    TARGET COLUMNS (
        CNPJ,
        RAZAO,
        NATUREZAJURIDICA,
        QUALIFICRESPONSAVEL,
        CAPITALSOCIAL,
        PORTEDAEMPRESA,
        ENTEFEDERATIVO
   ) WITH truncate,
        skip header = 0,
        fields optionally enclosed by '"',
        fields escaped by double-quote,
        fields terminated by ';'
    SET work_mem to '32 MB', maintenance_work_mem to '64 MB'

Depois disso, basta colocar este script do pgloader na mesma pasta que posui o arquivo disponibilizado pela RFB.

Para facilitar a implementação, o arquivo deve estar descompactado.

.
└── _datapump
    ├── loadextractor
    └── K3241.K03200Y0.D20108.EMPRECSV

Após fazer esta organização basta executar o comando:

pgloader loaderextractor

Agora o pgloader vai fazer as etapas descritas no arquivo loadextractor.

Após fazer a carga dos dados um report será exibido:

2022-02-17T13:23:49.193815Z LOG report summary reset
             table name     errors       rows      bytes      total time
-----------------------  ---------  ---------  ---------  --------------
                  fetch          0          0                     0.008s
            before load          0          2                     0.040s
-----------------------  ---------  ---------  ---------  --------------
     "public"."empresa"          0    7947277   456.2 MB       2m31.758s
-----------------------  ---------  ---------  ---------  --------------
        Files Processed          0          1                     0.024s
COPY Threads Completion          0          2                  2m31.758s
-----------------------  ---------  ---------  ---------  --------------
      Total import time          ✓    7947277   456.2 MB       2m31.782s

Resolução de problemas

Da primeira vez que eu tentei executar houve um erro durante o processo de importação por causa de um erro de encoding nos dados.

Para resolver eu abri o arquivo K3241.K03200Y0.D20108.EMPRECSV e editei manualmente o arquivo baixado com a lista dos CNPJ's.

Para descobrir onde está o erro eu apenas executei o pgloader e procurei  nas mensagens de log por alguma "pista" de onde poderia estar o erro. Segue o log em que ocorreu um erro:

2022-02-17T13:51:46.104000Z WARNING Target table "\"public\".\"empresa\"" has 1 indexes defined against it.
2022-02-17T13:51:46.104000Z WARNING That could impact loading performance badly.
2022-02-17T13:51:46.104000Z WARNING Consider the option 'drop indexes'.
2022-02-17T13:51:47.427986Z FATAL :UTF-8 stream decoding error on
#<SB-SYS:FD-STREAM for "file K3241.K03200Y0.D20108.EMPRECSV" {1009254313}>:

  the octet sequence #(195 79) cannot be decoded.
2022-02-17T13:51:47.463985Z LOG report summary reset
             table name     errors       rows      bytes      total time
-----------------------  ---------  ---------  ---------  --------------
                  fetch          0          0                     0.000s
            before load          0          2                     0.012s
-----------------------  ---------  ---------  ---------  --------------
     "public"."empresa"          1      64366     4.0 MB          1.348s
-----------------------  ---------  ---------  ---------  --------------
        Files Processed          0          1                     0.016s
COPY Threads Completion          0          2                     1.340s
-----------------------  ---------  ---------  ---------  --------------
      Total import time          1      64366     4.0 MB          1.356s

Então eu abri o arquivo  na linha 64366 e procurei pelo problema, na linha 64367 podemos ver que existe um erro de encoding, então eu substituí o caractere por um correspondente, após fazer isso eu fui capaz de importar com sucesso.

problema encoding

Segue a versão textual da linha que possui o problema:

"41338161";"ESCOLA AGROTECNICA FEDERAL DE CRATO CEARA";"1104";"16";"0,00";"05";"UNI�O"

Segue a versão corrigida:

"41338161";"ESCOLA AGROTECNICA FEDERAL DE CRATO CEARA";"1104";"16";"0,00";"05";"UNIÃO"