
Creating a complete solution of BigData ingestion using Google Cloud
BigData Ingestion Project: Google Cloud ecosystem: Storage, Function, Composer, Dataflow & BigQuery
Motivation: Client has site that every 1 minute generates a log file in CSV format of 1 GB.
This file has an event log for every browsing cycle of each client from initial access to the site to the payment.
Challenge: Design scalable solution that reads these CSV files to extract relevant information for the client, such as:
Important KPIs such as conversion rate, shopping basket abandonment rate among others.
The scope of this document is divided into the following topics.
1. Proposed Architecture
2. Motivation of the Proposal
3. Pipeline execution
4. Implementation
5. Proposed evolution
6. Who am I?
7. References
1. Proposed Architecture

1.2 Main Flow
The architecture has the following main stream:
- The index.js function in Google Cloud Functions keeps track of the your-project-name-on-google bucket searching for new files added in Google Storage
- This function has a trigger that is informed when a new file is added to the bucket, thus triggering the simple_load_dag.py orchestrator in Google Composer .
- Google Composer runs the python script storage-to-dataflow-to-bigquery.py in Google DataFlow , which is our pipeline, which takes the CSV files added in the bucket transforms into the known format and adds it to the BigQuery table.
- By inserting each line and field of the CSV in the table your-project-name-in-google: dataNavigationDataSet.RAW_DATA_NAVIGATION of BigQuery, doing everything right, Google Composer , moves the CSV to the bucket your-project-name-in-google-bucket -navi-out.
- The SalesKPI.sql view created BigQuery has indicators relevant to the project.
- This view is accessed by Google DataStudio generating report and metrics.
1.3 Brief description of each step in your call order, with your due google marketing.
- Google Storage , our storage where we will place our CSV files for processing.
Unified object storage for developers and enterprises
- Google Cloud Functions , our trigger that tells you when a new file is added in the bucket.
The easiest way to run and stagger code in the cloud
- Google Composer , our orchestrator who will manage all the workflow.
A fully managed workflow orchestration service built on Apache Airflow
- Google DataFlow , responsible for managing the data pipeline.
Simplified stream and batch data processing with the same reliability and expressiveness
- BigQuery , our database.
A fast, highly scalable, economical and fully managed cloud data storage for machine learning analysis.
- DataStudio , our tool for creating reports.
Unlock the power of your data with interactive dashboards and beautiful reports that inspire smarter business decisions.
2. Motivation of the Proposal
For this architecture we prioritize not to install any library or tool locally, everything will be installed on the cloud.
It covers client and other requirements such as scalability on demand, simplified programming model and opensource, cost control, automatic resource management, which according to google says:
“Practically unlimited resources.”
2.1 Personal Motivation for Architecture
2.1.1 Why draw every solution in the cloud and not locally?
As my computer is not very good, experiencing any solution locally would be unfortunate.
2.1.2 Why using Google Cloud?
I chose google because I already have enough contact every Firebase Cloud solution, including the Functions for a personal project with IONIC. Also because I’ve won a voucher for 6 months to use all the Google Cloud services for free.
2.1.3 Why use Python and not Java?
Because Java I already have enough experience and Python for this type of project I would have a cleaner code.
2.1.4 Why use Google DataStudio?
I used here to simplify our example, the customization of the reports is very limited, would indicate solutions like Tableau, SAP BO, SAS among others, would depend on the budget.
3. Pipeline execution
Follow the steps below where we will simulate all pipeline execution by inserting a file into your bucket your-project-name-in-google, viewing from the addition of the CSV in the bucket until the update of the view in BigQuery with the indicators.
3.1 Access the cloud terminal shell .
3.2. Run the following command to copy a CSV from the bucket temp folder to the folder being monitored by Cloud Functions:
gsutil cp -p gs://your-project-name-no-google-bucket-navi-temp/dates_navegational_p1.csv gs://"your-project-name-in-google/data_navigational_p1.csv"
3.3. See trigger triggering in Cloud Functions .
3.4. See that Google Composer is up , if you want to see the log.
3.5. Note that the pipeline was started in Google Dataflow, wait for the job to finish.
3.6. See the updated your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION table, as well as the KPIs .
3.7. See the processed files have been deleted from your bucket your-project-name-in-google moved to your bucket your-project-name-in-google-bucket-navi-out
4. Implementation
4.1 Dataflow
Dataflow screens in job execution


Example with csv that should be imported into your google storage bucket:
Example python file with JOB implementation to run in Google Dataflow:
With the following important points, for script customization:
Line 121: The separation of the fields in the CSV and the types of each, for creation of the table of the bank.
schema = 'load_timestamp: STRING, ip: STRING, visit_id: STRING, device_type: STRING, url_location: STRING, page_type: STRING, search_query: STRING, product_id: STRING, site_department_id: STRING, product_unit_price: STRING, freight_delivery_time: STRING, freight_value: STRING , cart_qty: STRING, cart_total_value: STRING ',
The types are the same as the SCHEMA of the BigQuery tables.
Line 38: Get the CSV lines and transform them into the format understandable by BigQuery, informing the order of the fields in the CSV.
('load_timestamp', 'ip', 'visit_id', 'device_type', 'url_location', 'page_type', 'search_query', 'product_id', 'site_department_id', 'product_unit_price', 'freight_delivery_time' , 'freight_value', 'cart_qty', 'cart_total_value'),
values))
Line 104: CREATE_IF_NEEDED creates the table if it does not exist. Line 106: WRITE_TRUNCATE deletes the table if it exists and insert, this is by way of example, but should be: WRITE_APPEND
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
Here you can see more details of these parameters: https://beam.apache.org/documentation/io/built-in/google-bigquery/
Example of running google cloud shell terminal pipeline:
python storage-to-dataflow-to-bigquery.py --input gs://your-project-name-on-google-bucket-navigation/data_navigation* -output your-project-name-on-google:dataNavigationDataSet.RAW_DATA_NAVIGATION --runner DataflowRunner --project your-project-name-on-google --job_name job-name-001 --temp_location gs://your-project-name-on-google-bucket-navigation/tmp/
4.2 BigQuery

The following view was created: your-project-name-in-google: dataNavigationDataSet.SalesKPI, with the most relevant indicators:
SELECT 'Shopping Cart Abandonment' as Description, basket.losing as q1, thankyou.buy as q2, 100-ROUND ((thankyou.buy / basket.losing) * 100, 2) as rate FROM
(SELECT count (distinct visit_id) as losing FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'basket') basket
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Conversion rate from home', home.visit, thankyou.buy, ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'home') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Conversion rate from Search', home.visit, thankyou.buy, ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'search') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'Conversion rate from Product', home.visit, thankyou.buy, ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'product') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
union all
SELECT 'No payment - Dismiss', home.visit, thankyou.buy, 100-ROUND ((thankyou.buy / home.visit) * 100, 2) the conversion_rate_home FROM
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'payment') home
CROSS JOIN
(SELECT count (distinct visit_id) as FROM FROM `your-project-name-in-google.dataNavigationDataSet.RAW_DATA_NAVIGATION`
where page_type = 'thankyou') thankyou
4.3 DataStudio
Example of possible dashboards and reports that can be created:

Main Dashboard

5. Evolution Proposal
The perfect world is to evolve to the next workflow, if you want to contribute in some way, commits are welcome.
https://github.com/samueljb/google-cloud-solution-escalavel-importacao-csv

6. Who am I …
I am Samuel Balbino, passionate about software development and data manipulation.
If you want to be my virtual friend, have questions or need some consulting, just look for me in the social networks below:
https://linktr.ee/samueljboficial
7 External links
gcp-batch-ingestion-pipeline-python
https://github.com/servian/gcp-batch-ingestion-pipeline-python
Triggering DAGs (workflows)
https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
Cloud Composer Examples
Triggering a Cloud Composer DAG from Cloud Functions
Orchestrating jobs with Apache Airflow / Cloud Composer
https://hcoelho.com/blog/63/Orchestrating_jobs_with_Apache_Airflow_Cloud_Composer