
Criando uma solução completa de BigData utilizando o Google Cloud
Google Storage, Function, Composer, DataFlow & BigQuery
Solução BigData escalável desde a importação até a análise e extração de indicadores.
O escopo desse documento se divide nos seguintes tópicos.
1. Arquitetura Proposta
2. Motivação
3. Execução da pipeline
4. Implementação
5. Proposta de evolução
6. Quem sou eu?
7. Referências
1. Arquitetura Proposta

1.2 Fluxo Principal
Vamos direto ao ponto, a arquitetura tem o seguinte fluxo principal:
- A função index.js no Google Cloud Functions fica monitorando um bucket no google Google Storage a procura por novos arquivos adicionados.
- Essa função tem uma trigger que é informada quando um novo arquivo é adicionado no bucket, logo, acionando o orquestrador simple_load_dag.py no Google Composer.
- Google Composer executa o script python storage-to-dataflow-to-bigquery.py no Google DataFlow, que é a nossa pipeline, que pega os arquivos adicionados no bucket e os transforma no formato conhecido, adicionando na tabela do BigQuery.
- Inserindo cada linha na tabela do BigQuery, dando tudo certo, o Google Composer, move o arquivo outro bucket.
- A view SalesKPI.sql criada BigQuery possui possui alguns exemplos de indicadores.
- Essa view pode ser acessada pelo Google DataStudio, aonde é possivel relatório e os indicadores.
1.3 Breve descrição de cada step na sua ordem de chamada, com seu devido google marketing.
- Google Storage, nosso storage aonde colocaremos nossos arquivos para serem processados.
Armazenamento unificado de objetos para desenvolvedores e empresas
- Google Cloud Functions, nossa trigger que avisa quando um novo arquivo for adicinado no bucket.
A maneira mais fácil de executar e escalonar o código na nuvem
- Google Composer, nosso orquestrador que fará a gestão de todo workflow.
Um serviço de orquestração do fluxo de trabalho totalmente gerenciado criado no Apache Airflow
- Google DataFlow, responsável pela gestão da pipeline dos dados.
Processamento simplificado de dados de stream em lote, com a mesma confiabilidade e expressividade
- BigQuery, nosso banco de dados.
Um armazenamento de dados na nuvem rápido, altamente dimensionável, econômico e totalmente gerenciado para análise com machine learning.
- DataStudio, nossa ferramenta para criação de relatórios.
Desbloqueie o poder de seus dados com painéis interativos e relatórios bonitos que inspiram decisões de negócios mais inteligentes.
2. Motivação
Para essa essa arquitetura priorizamos não obrigar instalar nenhuma biblioteca ou ferramenta localmente, tudo será instalado na núvem.
Cobre requisitos como escalabilidade sob demanda, modelo de programação simplificado e opensource, controle de custo, gerenciamento automático de recursos, que conforme a google diz:
“Recursos praticamente ilimitados”.
2.1 Motivação Pessoal para Arquitetura
2.1.1 Porque desenhar toda solução na núvem e não localmente ?
Como meu computador não é muito bom, experimentar qualquer solução localmente seria lastimável.
2.1.2 Porque utilizando o Google Cloud ?
Escolhi o google por já ter bastante contato toda solução Cloud do Firebase, inclusive o Functions para um projeto pessoal com IONIC. Também porque eu ganhei um voucher por 6 meses para utilizar todos o serviços do Google Cloud gratuitamente.
2.1.3 Porque utilizar Python e não Java ?
Porque Java eu já tenho bastante experiência e Python para esse tipo de projeto eu teria um código mais limpo.
2.1.4 Porque utilizar Google DataStudio ?
Utilizei aqui para simplificar o nosso exemplo, o customização dos relatórios é muito limitada, indicaria soluções como Tableau, SAP BO, SAS entre outros, dependeria do orçamento.
3. Execução da pipeline
Siga os passos abaixo para simular toda execução da pipeline inserindo um arquivo no storage bucket, entendendo desde a adição do arquivo no bucket até a atualização da view no BigQuery com os indicadores.
3.1 Acesse o cloud terminal shell.
3.2. Execute o seguinte comando para copiar o arquivo da pasta do bucket temp para a pasta que esta sendo monitorada pelo Cloud Functions:
gsutil cp -p gs://seu-projeto-nome-no-google-bucket-navi-temp/arquivo_exemplo.csv gs://"seu-projeto-nome-no-google/arquivo_exemplo.csv"
3.3. Veja o acionamento do trigger no Cloud Functions.
3.4. Veja que o Google Composer esta up , caso queira veja o log.
3.5. Veja que a pipeline foi iniciada no Google Dataflow, aguarde o job finalizar.
3.6. Veja a tabela no BigQuery ser atualizada, assim como os KPIs.
3.7. Veja os arquivos processados serem apagadas do bucket emovidos para o bucket de saída.
4. Implementação
4.1 Dataflow
Telas do Dataflow para execução do job


Exemplo arquivo python com a implementação do JOB para rodar no Google Dataflow:
Com os seguintes pontos importantes, para customização do script:
Linha 121: A separação dos campos no CSV e os tipos de cada um, para criação da tabela do banco.
schema='fieldName1:STRING,fieldName2:STRING',
Os tipos são os mesmos do SCHEMA das tabelas do BigQuery.
Linha 38: Captura as linhas do CSV e transforma no formato entendível pelo BigQuery, informando a ordem dos campos.
row = dict( zip(('fieldName1', 'fieldName2'),
values))
Linha 104: CREATE_IF_NEEDED cria a tabela caso não exista. Linha 106: WRITE_TRUNCATE apaga a tabela caso exista e insira, esta assim a título de exemplo, mas deveria ser: WRITE_APPEND
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
Aqui você pode ver mais detalhes desse parâmetros: https://beam.apache.org/documentation/io/built-in/google-bigquery/
Exemplo de execução de pipeline terminal shell do google cloud:
python storage-to-dataflow-to-bigquery.py --input gs://seu-projeto-nome-no-google-bucket-navigation/dados_navegacionais* --output seu-projeto-nome-no-google:dataNavigationDataSet.RAW_DATA_NAVIGATION --runner DataflowRunner --project seu-projeto-nome-no-google --job_name job-name-001 --temp_location gs://seu-projeto-nome-no-google-bucket-navigation/tmp/
4.2 BigQuery

Foi criada seguinte view com os indicadores mais relevantes:
SELECT 'Abandono de Carrinho de compras' as Description, basket.losing as q1, thankyou.buy as q2, 100-ROUND((thankyou.buy / basket.losing)*100, 2) as rate FROM
(SELECT count(distinct visit_id) as losing FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'basket') basket
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Taxa de conversão vindo da home', home.visit, thankyou.buy, ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'home') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Taxa de conversão vindo da Busca', home.visit, thankyou.buy, ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'search') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Taxa de conversão vindo do Produto', home.visit, thankyou.buy, ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'product') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Não pagamento - Desistencia', home.visit, thankyou.buy, 100-ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'payment') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
4.3 DataStudio
Exemplo de possíveis dashboards e relatórios que podem ser criados:

Dashboard principal

5. Proposta de Evolução
O mundo perfeito é evoluir para o seguinte workflow, caso queira contribuir de alguma forma, commits são bem vindos.
https://github.com/samueljb/google-cloud-solucao-escalavel-importacao-csv

6. Quem sou eu…
Sou Samuel Balbino, apaixonado por desenvolvimento de softwares e manipulação de dados.
Caso queira ser meu amigo virtual, tenha dúvidas, questionamentos ou precise de alguma consultoria, é só me procurar nas rede sociais abaixo:
https://linktr.ee/samueljboficial
7 Referências
gcp-batch-ingestion-pipeline-python
https://github.com/servian/gcp-batch-ingestion-pipeline-python
Triggering DAGs (workflows)
https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
Cloud Composer Examples
Triggering a Cloud Composer DAG from Cloud Functions
Orchestrating jobs with Apache Airflow/Cloud Composer
https://hcoelho.com/blog/63/Orchestrating_jobs_with_Apache_Airflow_Cloud_Composer