Image by Anderson for eQuest

Creating a complete solution of BigData ingestion using Google Cloud

BigData Ingestion Project: Google Cloud ecosystem: Storage, Function, Composer, Dataflow & BigQuery

Motivation: Client has site that every 1 minute generates a log file in CSV format of 1 GB.

This file has an event log for every browsing cycle of each client from initial access to the site to the payment.

Challenge: Design scalable solution that reads these CSV files to extract relevant information for the client, such as:

Important KPIs such as conversion rate, shopping basket abandonment rate among others.

The scope of this document is divided into the following topics.

1. Proposed Architecture

2. Motivation of the Proposal

3. Pipeline execution

4. Implementation

5. Proposed evolution

6. Who am I?

7. References

1. Proposed Architecture

1.2 Main Flow

The architecture has the following main stream:

  1. The index.js function in Google Cloud Functions keeps track of the your-project-name-on-google bucket searching for new files added in Google Storage
  2. This function has a trigger that is informed when a new file is added to the bucket, thus triggering the simple_load_dag.py orchestrator in Google Composer .
  3. Google Composer runs the python script storage-to-dataflow-to-bigquery.py in Google DataFlow , which is our pipeline, which takes the CSV files added in the bucket transforms into the known format and adds it to the BigQuery table.
  4. By inserting each line and field of the CSV in the table your-project-name-in-google: dataNavigationDataSet.RAW_DATA_NAVIGATION of BigQuery, doing everything right, Google Composer , moves the CSV to the bucket your-project-name-in-google-bucket -navi-out.
  5. The SalesKPI.sql view created BigQuery has indicators relevant to the project.
  6. This view is accessed by Google DataStudio generating report and metrics.

1.3 Brief description of each step in your call order, with your due google marketing.

  • Google Storage , our storage where we will place our CSV files for processing.

Unified object storage for developers and enterprises

The easiest way to run and stagger code in the cloud

A fully managed workflow orchestration service built on Apache Airflow

Simplified stream and batch data processing with the same reliability and expressiveness

A fast, highly scalable, economical and fully managed cloud data storage for machine learning analysis.

Unlock the power of your data with interactive dashboards and beautiful reports that inspire smarter business decisions.

2. Motivation of the Proposal

For this architecture we prioritize not to install any library or tool locally, everything will be installed on the cloud.
It covers client and other requirements such as scalability on demand, simplified programming model and opensource, cost control, automatic resource management, which according to google says:

“Practically unlimited resources.”

2.1 Personal Motivation for Architecture

2.1.1 Why draw every solution in the cloud and not locally?

As my computer is not very good, experiencing any solution locally would be unfortunate.

2.1.2 Why using Google Cloud?

I chose google because I already have enough contact every Firebase Cloud solution, including the Functions for a personal project with IONIC. Also because I’ve won a voucher for 6 months to use all the Google Cloud services for free.

2.1.3 Why use Python and not Java?

Because Java I already have enough experience and Python for this type of project I would have a cleaner code.

2.1.4 Why use Google DataStudio?

I used here to simplify our example, the customization of the reports is very limited, would indicate solutions like Tableau, SAP BO, SAS among others, would depend on the budget.

3. Pipeline execution

Follow the steps below where we will simulate all pipeline execution by inserting a file into your bucket your-project-name-in-google, viewing from the addition of the CSV in the bucket until the update of the view in BigQuery with the indicators.

3.1 Access the cloud terminal shell .

3.2. Run the following command to copy a CSV from the bucket temp folder to the folder being monitored by Cloud Functions:

gsutil cp -p gs://your-project-name-no-google-bucket-navi-temp/dates_navegational_p1.csv gs://"your-project-name-in-google/data_navigational_p1.csv"

3.3. See trigger triggering in Cloud Functions .

3.4. See that Google Composer is up , if you want to see the log.

3.5. Note that the pipeline was started in Google Dataflow, wait for the job to finish.

3.6. See the updated your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION table, as well as the KPIs .

3.7. See the processed files have been deleted from your bucket your-project-name-in-google moved to your bucket your-project-name-in-google-bucket-navi-out

4. Implementation

4.1 Dataflow

Dataflow screens in job execution

Example with csv that should be imported into your google storage bucket:

dados_navigacionais_100.csv

Example python file with JOB implementation to run in Google Dataflow:

storage-to-dataflow-to-bigquery.py

With the following important points, for script customization:

Line 121: The separation of the fields in the CSV and the types of each, for creation of the table of the bank.

schema = 'load_timestamp: STRING, ip: STRING, visit_id: STRING, device_type: STRING, url_location: STRING, page_type: STRING, search_query: STRING, product_id: STRING, site_department_id: STRING, product_unit_price: STRING, freight_delivery_time: STRING, freight_value: STRING , cart_qty: STRING, cart_total_value: STRING ',

The types are the same as the SCHEMA of the BigQuery tables.

Line 38: Get the CSV lines and transform them into the format understandable by BigQuery, informing the order of the fields in the CSV.

('load_timestamp', 'ip', 'visit_id', 'device_type', 'url_location', 'page_type', 'search_query', 'product_id', 'site_department_id', 'product_unit_price', 'freight_delivery_time' , 'freight_value', 'cart_qty', 'cart_total_value'), 
values))

Line 104: CREATE_IF_NEEDED creates the table if it does not exist. Line 106: WRITE_TRUNCATE deletes the table if it exists and insert, this is by way of example, but should be: WRITE_APPEND

create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
# Deletes all data in the BigQuery table before writing.
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

Here you can see more details of these parameters: https://beam.apache.org/documentation/io/built-in/google-bigquery/

Example of running google cloud shell terminal pipeline:

python storage-to-dataflow-to-bigquery.py --input gs://your-project-name-on-google-bucket-navigation/data_navigation* -output your-project-name-on-google:dataNavigationDataSet.RAW_DATA_NAVIGATION --runner DataflowRunner --project your-project-name-on-google --job_name job-name-001 --temp_location gs://your-project-name-on-google-bucket-navigation/tmp/

4.2 BigQuery

The following view was created: your-project-name-in-google: dataNavigationDataSet.SalesKPI, with the most relevant indicators:

SELECT 'Shopping Cart Abandonment' as Description, basket.losing as q1, thankyou.buy as q2, 100-ROUND ((thankyou.buy / basket.losing) * 100, 2) as rate FROM 
(SELECT count (distinct visit_id) as losing FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'basket') basket
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Conversion rate from home', home.visit, thankyou.buy, ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'home') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Conversion rate from Search', home.visit, thankyou.buy, ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'search') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Conversion rate from Product', home.visit, thankyou.buy, ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'product') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'No payment - Dismiss', home.visit, thankyou.buy, 100-ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'payment') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou

4.3 DataStudio

Example of possible dashboards and reports that can be created:

Main Dashboard

5. Evolution Proposal

The perfect world is to evolve to the next workflow, if you want to contribute in some way, commits are welcome.

https://github.com/samueljb/google-cloud-solution-escalavel-importacao-csv

6. Who am I …

I am Samuel Balbino, passionate about software development and data manipulation.

If you want to be my virtual friend, have questions or need some consulting, just look for me in the social networks below:

https://linktr.ee/samueljboficial

7 External links

gcp-batch-ingestion-pipeline-python

https://github.com/servian/gcp-batch-ingestion-pipeline-python

Triggering DAGs (workflows)

https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

Cloud Composer Examples

https://github.com/GoogleCloudPlatform/professional-services/tree/master/examples/cloud-composer-examples

Triggering a Cloud Composer DAG from Cloud Functions

https://github.com/GoogleCloudPlatform/nodejs-docs-samples/tree/058f03387f7acbec25b2ac363c3fe584572e1777/functions/composer-storage-trigger

Orchestrating jobs with Apache Airflow / Cloud Composer

https://hcoelho.com/blog/63/Orchestrating_jobs_with_Apache_Airflow_Cloud_Composer

Expert in software development and data manipulation.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Have you ever observed your Interview before?

secrets behind the Airbnb startup!!-”every startup challenges”

How to Create a Dating App Like Tinder: Full Guide

Introducing Workspaces

9 Different Ways to Embedded Code In Medium

Journal 126 — Unity App Development Setup Part 7 UI Manager and Scripting

Free Mockup Resources for you

A Microservices implementation journey — Part 3

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Samuel JB

Samuel JB

Expert in software development and data manipulation.

More from Medium

Installing Cloudera Quickstart VM using Docker Hub (on Mac M1)

Warehousing with Google’s Big Query

Big Data and Machine Learning in the Cloud

Connect your Google Bigquery Data With Prevision.io