pygrametl - ETL programming in Python

pygrametl (pronounced py-gram-e-t-l) is a Python framework that provides functionality commonly used when developing Extract-Transform-Load (ETL) programs. It is fully open-source and released under a 2-clause BSD license. As shown in the figure below, an ETL program that uses pygrametl is a standard Python program that imports pygrametl and uses the abstractions it provides. To provide developers with complete control over the data warehouse's schema, pygrametl assumes that all of the dimension tables and fact tables used in the ETL program have already been created using SQL.

ETL process using pygrametl

Defining the data warehouse's schema using SQL and implementing the ETL program itself using standard Python turns out to be very efficient and effective, even when compared to drawing the program in a graphical user interface like Apache Hop or Pentaho Data Integration. pygrametl supports CPython and Jython so both existing Python code that uses native extensions models and PEP 249 connectors and JVM-based code that uses JDBC drivers can be used in the ETL program.

When using pygrametl, the developer creates an object for each data source, dimension and fact table and operate on rows in the form of standard Python dicts. Thus, (s)he can easily read rows from a data source using a loop like for row in datasource:, transform the rows using arbitrary Python code like row["price"] *= 1.25, and then add new dimension members to a dimension and facts to a fact table using dimension.insert(row) and facttable.insert(row), respectively. This is a very simple example, but pygrametl also supports much more complicated scenarios. For example, it is possible to create a single object for an entire snowflaked dimension. It is then possible to add a new dimension member with a single method call by using snowflake.insert(row). This will automatically perform all of the necessary lookups and insertions in the tables participating in the snowflaked dimension. pygrametl also supports multiple types of slowly changing dimensions. Again, the programmer only has to invoke a single method: slowlychanging.scdensure(row). This will perform the needed updates of both type 1 (i.e., overwrites) and type 2 (i.e., adding new versions).

pygrametl was first made publicly available in 2009. Since then, we have continuously made improvements and added new features. Version 2.8 was released in September 2023. Today, pygrametl is used in production systems in different sectors such as healthcare, finance, and transport.

Installation

pygrametl can be installed from PyPI with the following command:

$ pip install pygrametl

The current development version of pygrametl is available on GitHub.

$ git clone https://github.com/chrthomsen/pygrametl.git

For more information about installation see the Install Guide.

Example Program

Here you can see a complete example of a pygrametl program. The ETL program extracts data from two CSV files and joins their content before it is loaded into a data warehouse with the following schema.

Schema

The schema consists of a facttable, in the form of the testresults, and three dimensions test, time and page. The test and time dimensions are each represented as a single table, test and date. The page dimension is slowly changing and snowflaked into five tables, page, domain, topleveldomain, serverversion and server.

The code can be seen below and has a few functions defined in the top. After the functions, the pygrametl Dimension, FactTable, and Source objects are created. Using these objects, the main method only requires 10 lines of code to load the data warehouse. Note how easy it is to fill the page dimension even though it is slowly changing and snowflaked.

# This is code for loading the data warehouse from the running example
# presented in C. Thomsen & T.B. Pedersen: "pygrametl: A Powerful Programming
# Framework for Extract--Transform--Load Programmers"
#
# It is made to be used with PostgreSQL and psycopg2 but you can
# modify it to work with another DBMS.


#
#  Copyright (c) 2009-2021 Aalborg University (pygrametl@cs.aau.dk)
#
#  This file is free software: you may copy, redistribute and/or modify it
#  under the terms of the GNU General Public License version 2
#  as published by the Free Software Foundation.
#
#  This file is distributed in the hope that it will be useful, but
#  WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see http://www.gnu.org/licenses.
#


import datetime
import sys
import time

# In this example, we use psycopg2. You can change it to another driver,
# but then the method pgcopybulkloader won't work as we use driver-specific
# code there.
# You can make another function or declare facttbl (see further below) to
# be a BatchFactTable such that you don't need special
# bulk loading methods.

import psycopg2

# Depending on your system, you might have to do something like this
# where you append the path where pygrametl is installed
sys.path.append('/home/me/code')

import pygrametl
from pygrametl.datasources import CSVSource, MergeJoiningSource
from pygrametl.tables import CachedDimension, SnowflakedDimension,\
    SlowlyChangingDimension, BulkFactTable


# Connection to the target data warehouse:
pgconn = psycopg2.connect(user='me')
connection = pygrametl.ConnectionWrapper(pgconn)
connection.setasdefault()
connection.execute('set search_path to pygrametlexa')


# Methods
def pgcopybulkloader(name, atts, fieldsep, rowsep, nullval, filehandle):
    # Here we use driver-specific code to get fast bulk loading.
    # You can change this method if you use another driver or you can
    # use the FactTable or BatchFactTable classes (which don't require
    # use of driver-specifc code) instead of the BulkFactTable class.
    global connection
    curs = connection.cursor()
    curs.copy_from(file=filehandle, table=name, sep=fieldsep,
                   null=str(nullval), columns=atts)

def datehandling(row, namemapping):
    # This method is called from ensure(row) when the lookup of a date fails.
    # In the Real World, you would probably prefill the date dimension, but
    # we use this to illustrate "rowexpanders" that make it possible to
    # calculate derived attributes on demand (such that the - possibly
    # expensive - calculations only are done when needed and not for each
    # seen data row).
    #
    # Here, we calculate all date related fields and add them to the row.
    date = pygrametl.getvalue(row, 'date', namemapping)
    (year, month, day, hour, minute, second, weekday, dayinyear, dst) = \
        time.strptime(date, "%Y-%m-%d")
    (isoyear, isoweek, isoweekday) = \
        datetime.date(year, month, day).isocalendar()
    row['day'] = day
    row['month'] = month
    row['year'] = year
    row['week'] = isoweek
    row['weekyear'] = isoyear
    row['dateid'] = dayinyear + 366 * (year - 1990) #Support dates from 1990
    return row


def extractdomaininfo(row):
    # Take the 'www.domain.org' part from 'http://www.domain.org/page.html'
    # We also the host name ('www') in the domain in this example.
    domaininfo = row['url'].split('/')[-2]
    row['domain'] = domaininfo
    # Take the top level which is the last part of the domain
    row['topleveldomain'] = domaininfo.split('.')[-1]

def extractserverinfo(row):
    # Find the server name from a string like "ServerName/Version"
    row['server'] = row['serverversion'].split('/')[0]

# Dimension and fact table objects
topleveldim = CachedDimension(
    name='topleveldomain',
    key='topleveldomainid',
    attributes=['topleveldomain'])

domaindim = CachedDimension(
    name='domain',
    key='domainid',
    attributes=['domain', 'topleveldomainid'],
    lookupatts=['domain'])

serverdim = CachedDimension(
    name='server',
    key='serverid',
    attributes=['server'])

serverversiondim = CachedDimension(
    name='serverversion',
    key='serverversionid',
    attributes=['serverversion', 'serverid'])

pagedim = SlowlyChangingDimension(
    name='page',
    key='pageid',
    attributes=['url', 'size', 'validfrom', 'validto', 'version',
                'domainid', 'serverversionid'],
    lookupatts=['url'],
    versionatt='version',
    fromatt='validfrom',
    toatt='validto',
    srcdateatt='lastmoddate',
    cachesize=-1)

pagesf = SnowflakedDimension(
    [(pagedim, (serverversiondim, domaindim)),
     (serverversiondim, serverdim),
     (domaindim, topleveldim)
     ])

testdim = CachedDimension(
    name='test',
    key='testid',
    attributes=['testname', 'testauthor'],
    lookupatts=['testname'],
    prefill=True,
    defaultidvalue=-1)

datedim = CachedDimension(
    name='date',
    key='dateid',
    attributes=['date', 'day', 'month', 'year', 'week', 'weekyear'],
    lookupatts=['date'],
    rowexpander=datehandling)

facttbl = BulkFactTable(
    name='testresults',
    keyrefs=['pageid', 'testid', 'dateid'],
    measures=['errors'],
    bulkloader=pgcopybulkloader,
    bulksize=5000000)


# Data sources - change the path if you have your files somewhere else
# The buffer size is set to 16384 B, as it performed better than any alternatives we tested
downloadlog = CSVSource(open('./DownloadLog.csv', 'r', 16384),
                        delimiter='\t')

testresults = CSVSource(open('./TestResults.csv', 'r', 16384),
                        delimiter='\t')

inputdata = MergeJoiningSource(downloadlog, 'localfile',
                               testresults, 'localfile')

def main():
    for row in inputdata:
        extractdomaininfo(row)
        extractserverinfo(row)
        row['size'] = pygrametl.getint(row['size']) # Convert to an int
        # Add the data to the dimension tables and the fact table
        row['pageid'] = pagesf.scdensure(row)
        row['dateid'] = datedim.ensure(row, {'date':'downloaddate'})
        row['testid'] = testdim.lookup(row, {'testname':'test'})
        facttbl.insert(row)
    connection.commit()

if __name__ == '__main__':
    main()

Documentation

The documentation is available in HTML and as a PDF. There are also installation and beginner guides available.

In addition to the documentation multiple papers have been published on pygrametl. The papers provide a more detailed description of the foundational ideas behind pygrametl, but is obviously not keep up to date with changes and improvements implemented in the framework, for such see the documentation.

If you use pygrametl in academia, please cite the relevant paper(s) above.

Community

To keep the development of pygrametl open for external participation, we have public mailing lists and use Github. Feel free to ask questions and provide all kinds of feedback:

When asking a question or reporting a possible bug in pygrametl, please first verify that the problem still occurs with the latest version of pygrametl. If the problem persists after updating please include the following information, preferably with detailed version information, when reporting the problem:

We encourage use of Github and the mailing lists. For discussions not suitable for a public mailing list, you can, however, send us a private email.