Image by Anderson for eQuest

Criando uma solução completa de BigData utilizando o Google Cloud

Google Storage, Function, Composer, DataFlow & BigQuery

Solução BigData escalável desde a importação até a análise e extração de indicadores.

O escopo desse documento se divide nos seguintes tópicos.

1. Arquitetura Proposta

2. Motivação

3. Execução da pipeline

4. Implementação

5. Proposta de evolução

6. Quem sou eu?

7. Referências

1. Arquitetura Proposta

1.2 Fluxo Principal

Vamos direto ao ponto, a arquitetura tem o seguinte fluxo principal:

  1. A função index.js no Google Cloud Functions fica monitorando um bucket no google Google Storage a procura por novos arquivos adicionados.
  2. Essa função tem uma trigger que é informada quando um novo arquivo é adicionado no bucket, logo, acionando o orquestrador simple_load_dag.py no Google Composer.
  3. Google Composer executa o script python storage-to-dataflow-to-bigquery.py no Google DataFlow, que é a nossa pipeline, que pega os arquivos adicionados no bucket e os transforma no formato conhecido, adicionando na tabela do BigQuery.
  4. Inserindo cada linha na tabela do BigQuery, dando tudo certo, o Google Composer, move o arquivo outro bucket.
  5. A view SalesKPI.sql criada BigQuery possui possui alguns exemplos de indicadores.
  6. Essa view pode ser acessada pelo Google DataStudio, aonde é possivel relatório e os indicadores.

1.3 Breve descrição de cada step na sua ordem de chamada, com seu devido google marketing.

  • Google Storage, nosso storage aonde colocaremos nossos arquivos para serem processados.

Armazenamento unificado de objetos para desenvolvedores e empresas

A maneira mais fácil de executar e escalonar o código na nuvem

Um serviço de orquestração do fluxo de trabalho totalmente gerenciado criado no Apache Airflow

Processamento simplificado de dados de stream em lote, com a mesma confiabilidade e expressividade

Um armazenamento de dados na nuvem rápido, altamente dimensionável, econômico e totalmente gerenciado para análise com machine learning.

  • DataStudio, nossa ferramenta para criação de relatórios.

Desbloqueie o poder de seus dados com painéis interativos e relatórios bonitos que inspiram decisões de negócios mais inteligentes.

2. Motivação

Para essa essa arquitetura priorizamos não obrigar instalar nenhuma biblioteca ou ferramenta localmente, tudo será instalado na núvem.
Cobre requisitos como escalabilidade sob demanda, modelo de programação simplificado e opensource, controle de custo, gerenciamento automático de recursos, que conforme a google diz:

“Recursos praticamente ilimitados”.

2.1 Motivação Pessoal para Arquitetura

2.1.1 Porque desenhar toda solução na núvem e não localmente ?

Como meu computador não é muito bom, experimentar qualquer solução localmente seria lastimável.

2.1.2 Porque utilizando o Google Cloud ?

Escolhi o google por já ter bastante contato toda solução Cloud do Firebase, inclusive o Functions para um projeto pessoal com IONIC. Também porque eu ganhei um voucher por 6 meses para utilizar todos o serviços do Google Cloud gratuitamente.

2.1.3 Porque utilizar Python e não Java ?

Porque Java eu já tenho bastante experiência e Python para esse tipo de projeto eu teria um código mais limpo.

2.1.4 Porque utilizar Google DataStudio ?

Utilizei aqui para simplificar o nosso exemplo, o customização dos relatórios é muito limitada, indicaria soluções como Tableau, SAP BO, SAS entre outros, dependeria do orçamento.

3. Execução da pipeline

Siga os passos abaixo para simular toda execução da pipeline inserindo um arquivo no storage bucket, entendendo desde a adição do arquivo no bucket até a atualização da view no BigQuery com os indicadores.

3.1 Acesse o cloud terminal shell.

3.2. Execute o seguinte comando para copiar o arquivo da pasta do bucket temp para a pasta que esta sendo monitorada pelo Cloud Functions:

gsutil cp -p gs://seu-projeto-nome-no-google-bucket-navi-temp/arquivo_exemplo.csv gs://"seu-projeto-nome-no-google/arquivo_exemplo.csv"

3.3. Veja o acionamento do trigger no Cloud Functions.

3.4. Veja que o Google Composer esta up , caso queira veja o log.

3.5. Veja que a pipeline foi iniciada no Google Dataflow, aguarde o job finalizar.

3.6. Veja a tabela no BigQuery ser atualizada, assim como os KPIs.

3.7. Veja os arquivos processados serem apagadas do bucket emovidos para o bucket de saída.

4. Implementação

4.1 Dataflow

Telas do Dataflow para execução do job

Exemplo arquivo python com a implementação do JOB para rodar no Google Dataflow:

storage-to-dataflow-to-bigquery.py

Com os seguintes pontos importantes, para customização do script:

Linha 121: A separação dos campos no CSV e os tipos de cada um, para criação da tabela do banco.

schema='fieldName1:STRING,fieldName2:STRING',

Os tipos são os mesmos do SCHEMA das tabelas do BigQuery.

Linha 38: Captura as linhas do CSV e transforma no formato entendível pelo BigQuery, informando a ordem dos campos.

row = dict( zip(('fieldName1', 'fieldName2'),
values))

Linha 104: CREATE_IF_NEEDED cria a tabela caso não exista. Linha 106: WRITE_TRUNCATE apaga a tabela caso exista e insira, esta assim a título de exemplo, mas deveria ser: WRITE_APPEND

create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

Aqui você pode ver mais detalhes desse parâmetros: https://beam.apache.org/documentation/io/built-in/google-bigquery/

Exemplo de execução de pipeline terminal shell do google cloud:

python storage-to-dataflow-to-bigquery.py --input gs://seu-projeto-nome-no-google-bucket-navigation/dados_navegacionais* --output seu-projeto-nome-no-google:dataNavigationDataSet.RAW_DATA_NAVIGATION --runner DataflowRunner --project seu-projeto-nome-no-google --job_name job-name-001 --temp_location gs://seu-projeto-nome-no-google-bucket-navigation/tmp/

4.2 BigQuery

Foi criada seguinte view com os indicadores mais relevantes:

SELECT  'Abandono de Carrinho de compras' as Description, basket.losing as q1, thankyou.buy as q2, 100-ROUND((thankyou.buy / basket.losing)*100, 2) as rate  FROM
(SELECT count(distinct visit_id) as losing FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'basket') basket
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Taxa de conversão vindo da home', home.visit, thankyou.buy, ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'home') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Taxa de conversão vindo da Busca', home.visit, thankyou.buy, ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'search') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Taxa de conversão vindo do Produto', home.visit, thankyou.buy, ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'product') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Não pagamento - Desistencia', home.visit, thankyou.buy, 100-ROUND((thankyou.buy / home.visit)*100, 2) as conversion_rate_home FROM
(SELECT count(distinct visit_id) as visit FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'payment') home
CROSS JOIN
(SELECT count(distinct visit_id) as buy FROM `seu-projeto-nome-no-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou

4.3 DataStudio

Exemplo de possíveis dashboards e relatórios que podem ser criados:

Dashboard principal

5. Proposta de Evolução

O mundo perfeito é evoluir para o seguinte workflow, caso queira contribuir de alguma forma, commits são bem vindos.

https://github.com/samueljb/google-cloud-solucao-escalavel-importacao-csv

6. Quem sou eu…

Sou Samuel Balbino, apaixonado por desenvolvimento de softwares e manipulação de dados.

Caso queira ser meu amigo virtual, tenha dúvidas, questionamentos ou precise de alguma consultoria, é só me procurar nas rede sociais abaixo:

https://linktr.ee/samueljboficial

7 Referências

gcp-batch-ingestion-pipeline-python

https://github.com/servian/gcp-batch-ingestion-pipeline-python

Triggering DAGs (workflows)

https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

Cloud Composer Examples

https://github.com/GoogleCloudPlatform/professional-services/tree/master/examples/cloud-composer-examples

Triggering a Cloud Composer DAG from Cloud Functions

https://github.com/GoogleCloudPlatform/nodejs-docs-samples/tree/058f03387f7acbec25b2ac363c3fe584572e1777/functions/composer-storage-trigger

Orchestrating jobs with Apache Airflow/Cloud Composer

https://hcoelho.com/blog/63/Orchestrating_jobs_with_Apache_Airflow_Cloud_Composer

Expert in software development and data manipulation.

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Samuel JB

Samuel JB

Expert in software development and data manipulation.

More from Medium

Cleaning and Injecting data into Microsoft SQL server from Google Colab.

How to Access Encrypted Data on S3 Through Jupyter Kernel

Installing Cloudera Quickstart VM using Docker Hub (on Mac M1)

And updated method of how to do Data Science quickly on GCP (Streamlined)