Jupyter Notebook to Data Pipeline

Kaj Kandler
AWS Tip
Published in
9 min readNov 23, 2022

--

Observations from building an automated data pipeline

Recently, I worked on a project in data science. The use case is a scooter company that saw demand at big city airports for its scooters. The company wanted to develop a model to predict demand, so it could truck the right amount of scooters to the airport at the right time. The company had determined that demand depends on the arrivals at the airport and the weather conditions. It knew that people did not use scooters for longer distances in rainy conditions. Rainy days are the best time to get scooters into the garage for maintenance.

The company asked me to be the data engineer and gather the relevant data for low cost. They could not provide me with a list of airports and cities, as they were expanding constantly within Europe.

In this article, I want to share some observations I made on that project.

Work your way inside out

The first task in the project was to gather some statistics about large cities in Europe. The company wanted me to gather population, and latitude, longitude to calculate the approximate distance to airports. The manager only stated: “You can find this data on Wikipedia.”

As I did not want to copy paste the information, I set out to scrape Wikipedia’s city pages, which to the human eye look quite structured and contain tables on the side with the information desired.

Firing up a Jupyter notebook, it wasn’t hard to use the Python and the Beautiful Soup package to extract data from the first page.

With the code in hand I wrapped it into a couple of functions that took a city name and returned the desired data into a dictionary.

def scrape_page(html):
result = {}
# ... extract results here with Beatiful Soup
return result

def scrape_wikipedia_city_page(city):
url = f'https://en.wikipedia.org/wiki/{city}'

response = requests.get(url)
# ... error handling
result = scrape_page(response.content)

From there it seemed easy to make a list of cities and to extract the data in a loop.

def scrape_wikipedia_cities(city_list):
city_data = []
for city in city_list:
city_data.append(scrape_wikipedia_city_page(city))
return city_data

However, that proved easier said than done. The second city did not return all data. While the page looked identical in its rendered form, the HTML code was slightly different. So I had to make the scraping code with Beautiful Soup more flexible to account for both versions.

I iterated over several cities until my loop did return complete results and did not break anymore, when I added new cities.

Finally, I stored the results into a pandas data frame and persisted it into a Comma Separated Values (CSV) file.

city_list = []
city_data = scrape_wikipedia_cities(city_list)
cities_df = pd.DataFrame(city_data)

cities_df.to_csv('cities.csv')

The pattern of writing small functions allowed me to test each part and to assemble growing complexity. Further the nested set of functions, one for a simple task, one for a loop to repeat the task and some more to persist it proved useful in other steps of this project.

Separate Collection, Transformation and Storage

In the next step, I needed to get a weather forecast for the cities the scooter company does business in.

Luckily there is a free API available to get a five day weather forecast in three hour intervals at openweathermap.org.

Building on the same principle as before, I did write a first method to get the forecast for a single city. I decided to return the raw JSON data from the method.

def fetch_5_day_weather_forecast_for_city(city):
weather_url = f"https://api.openweathermap.org/data/2.5/forecast?q={city}&units=metric&APPID={API_key}"
result = requests.get(weather_url)
if result.status_code != 200:
print(f'Access for {city} failed with {result.status_code}')
print(result.text)
return result.json()

In the next step I wrote the method to loop over all cities, collecting the JSON data in a list.

def fetch_5_day_weather_forecast(list_of_cities):
records = []
for city in list_of_cities:
records.append(fetch_5_day_weather_forecast_for_city(city))
return records

Only now, I did extract the data into a data frame, transforming each forecast into a record for each timestamp. So a single API call resulted in eight 3h forecasts times 5 days = 40 records.

def extract_forecast(raw_weather):
forecasts = []
for weather in raw_weather:
one_forecast = {}
for forecast in weather['list'].values:
one_forecast['city_id'] = weather['city']['id']
one_forecast['city_name'] = weather['city']['name']
one_forecast['city_country'] = weather['city']['country']
one_forecast['lat'] = weather['city']['coord']['lat']
one_forecast['lon'] = weather['city']['coord']['lon']
one_forecast['time'] = forecast['dt'] # Time of data forecasted, unix, UTC
one_forecast['summary'] = forecast['weather'][0]['main']
forecasts.append(one_forecast)
return forecasts

In additional methods I converted the data into a data frame and persisted the data to a file.

This structure of small dedicated methods came in very handy later, when I realized that the weather forecast is only useful for three days. To change the code was easy because I knew the exact place where I could apply this filter.

While it seems excessive to write 4+ functions for this fairly simple task, it has the advantage that I can name each function in a way that the code reads like English prose. This saves on documenting the code.

Create a local prototype

For projects like this, where you don’t know all the implementation plans, Jupyter notebooks are a wonderful tool. You can start exploring the problems and write code w/o having to deploy software to a server. You don’t need the overhead of deployment this early in the project.

You can work with Jupyter notebooks in various ways. You can use the local JupyterLab Server, an Integration into your favorite IDE or a cloud service like Google Colab.

In this project I chose the IDE, as I’m familiar with Visual Studio Code and I can easily store data on my local disk.

This setup also lets me easily switch components, such as storage in the file system vs. in a SQL database.

The local prototype lets me demonstrate progress to the stakeholders, instead of telling them that I need a few days to set up the infrastructure for the final deployment.

On a local machine one can also be more lenient with security, as it is not so critical to lose a password to a MySQL instance on my laptop, or the API key to the Open Weathermap API.

Coping with budget limitations

As mentioned in the introduction, the project needed to be realized on a shoe string budget. I was not able to find updated flight data for free. So I turned to Aero Data Box API on rapidapi.com

Rapid API is a commercial API provider. Before committing to a contract, I wanted to proof it is worth its money.

Luckily, Rapid API provides a free tier for development. However, it is limited to 400 calls / month. This limitation prohibited me from requesting frequent updates and looping over many airports before a contract was signed.

In order to develop and test the entire application I needed to cache the data. I did write a function that restored the data from a file, if the file already existed. Only newly requested data was fetched from the API.

ARRIVALS_CACHE = 'arrivals.csv'

def fetch_arrivals_for_airports(airports, date)
arrivals = []
for airport in airports:
fetch_arrivals(airport, date)

def load_flights()
if exists(ARRIVALS_CACHE):
arrivals_df = pd.read_json(flights_arrivals_file)
else:
airports = pd.read_csv('airports.json')['icao'].values
arrivals = fetch_arrivals_for_airports(airports, tomorrow)
arrivals_df = filter_arrivals(arrivals)
return arrivals_df

def load_flights_creating_cache():
arrivals_df = load_flights()
arrivals_df.to_csv(ARRIVALS_CACHE)
return arrivals_df

This layer of local caching allowed me to limit API usage to a few requests a day. The rest of the development happened against cached data.

Putting the cache mechanism into its own function allowed me to switch it out easily later for production use.

Creating an Automated Data Pipeline

Once the programs were written to gather all the data it was time to store the data in a way that allows better access for the analysis model.

The company agreed that a SQL database would be the right choice. Hence I turned the data frames into an entity relationship model and setup a MySql database locally.

Finally, the data gathering should run unattended every night. This meant the application needed to be transferred to a server that runs 24 / 7 and can execute the tasks periodically.

The company agreed that AWS lambda functions and other services are the appropriate choice for running this kind of application on a low budget. Because AWS lambda only bills for the time the functions are executed, likely a few cents per day.

However, the database storage will incur a bit more cost.

AWS Lambda executes pure functions, based on a triggering event. Another service AWS Event bridge can provide the trigger once at night every day.

Secure your secrets

Different parts of this application connect to different services which are secured by password or API keys. Storing these keys in the code is a security risk.

Secrets stored in code are exposed to code repositories, such as git. Further they are stored in the cloud infrastructure used and the code deployment there.

There are various ways to secure your secrets or at least make them harder to access.

In the local version of the application you can separate the secrets from the rest of the code. You can use a python file and use it as a module, where you store all your secrets.

# access.py
weather_api_key = '.....'
flights_api_key = '*****'
mysql_user = 'a-non-admin'
mysql_password = '**..**'
from access import weather_api_keypy

You can ensure that this file is not persisted in the source repository (for example use .gitignore)

# .gitignore
access.py

Actually, for the local prototype I chose another method, prompting the user for the secret. As my development environment was Jupyter I could store the secrets in memory. I used getpass to prompt for the secret, whenever I had to restart the jupyter runtime. While this is a bit of a hassle, it is not too big a price to pay.

import getpass
weather_api_key = getpass.getpass(prompt='Enter the API key for openweather API')

For cloud deployment you can use services like AWS Secrets Manager. It has a steep learning curve, but it offers encryption while stored, as well as encryption while transferred to your application. It even has provisions to seamlessly rotate your passwords and API keys.

However, security should depend on the value of the assets to protect. In the case of this data pipeline, the damage of unauthorized access would be some costs for the APIs and that is fairly limited. So I decided that hiding the secrets in a layer attached to the lambdas is enough.

To hide the secrets you’d create a python file or module, code the secrets in there and create a custom layer for your lambdas.

# Create a corking directory
mkdir my-layer
# copy the file(s) into the working directory
mv access.py my-layer
# iN the working directory ...
cd my-layer
# install the code as package
zip my-access-layer.zip access.py
# upload the zip file as layer to AWS

Conclusion — Keep the end goal in mind

In summary, when undertaking Data Science projects, you want to start with a prototype and iterate until you have a code basis that has all the features you need and all the automation you desire.

While Data Science often deals with big data, it is best to start with a small set of data. Good code structure with small, single purpose methods helps you to progress through the different stages of your project. Over time you can assemble the different parts to a whole.

While an iterative approach can work here very well, you want to do a little planning beforehand.

First, try to understand in some detail, what data the analysis will need. This will save you iterations, because you can extract and filter the right attributes.

Second, try to make estimates about the quantity of data you will collect, store and later process. Consider also how often you will add or update data. This will give you some estimate about the resources required for the final project in production.

Further consider that the largest driver of development cost is the time it takes to write the code. Don’t be penny wise and pound foolish by setting a a near zero budget for APIs and other resources during development. In this project the coding the caches to avoid a small amount of API cost did cost extra hours of work, testing and frustration that would have been worth the a few dollars to pay for one month of API access.

I hope these observations will make you successful in your next project.

--

--

Knowledge Graph Optimization | Specializing in personal branding for Public Speakers and Coaches | Educating search engines about notable people