Building a data pipeline#

import lumen
import panel as pn

pn.extension('tabulator')

Lumen dashboards are a powerful, declarative way to write data-driven applications and dashboards. However Lumen components also provide a powerful abstraction that can be leveraged independently of a full dashboard YAML specification. Specifically the Pipeline component offers an extremely powerful basis for building data transformations that can either be used to power analysis pipelines or to drive visual components we can render in a notebook or a custom Panel dashboard. In this section we will discover how to leverage pipelines in this way.

First however we need to understand how Pipelines work, specifically the order in which operations are applied. In particular we need to distinguish between operations that are applied by the Source and operations that are applied to data returned by the Source. This is because various Source types support data queries and in the case of SQL based Source may also support arbitrary SQL transforms. This means that the Pipeline will first call the Source.get method with state of the Filter and SQLTransform components allowing the Source to optimize the filter queries and transforms and return the data, once the data is returned as a DataFrame the declared Transform stages will be applied in sequence.

Pipeline Diagram

Declaring a pipeline#

Just like any other component in Lumen Pipeline components can be built using a declarative specification. Let us for example build a Pipeline that starts with a FileSource and applies a number of filters and transforms.

from lumen.pipeline import Pipeline

data_url = 'https://datasets.holoviz.org/penguins/v1/penguins.csv'

pipeline = Pipeline.from_spec({
    'source': {
        'type': 'file',
        'tables': {
            'penguins': data_url
        }
    },
    'filters': [
        {'type': 'widget', 'field': 'species'},
        {'type': 'widget', 'field': 'island'},
        {'type': 'widget', 'field': 'sex'},
        {'type': 'widget', 'field': 'year'}
    ],
    'transforms': [
        {'type': 'aggregate', 'method': 'mean', 'by': ['species', 'sex', 'year']}
    ]
})

Once declared we can inspect the current data easily:

pipeline.data
bill_length_mm bill_depth_mm flipper_length_mm body_mass_g
species sex year
Adelie female 2007 37.922727 18.127273 185.227273 3389.772727
2008 36.516000 17.460000 188.440000 3386.000000
2009 37.407692 17.350000 189.346154 3334.615385
male 2007 39.950000 19.509091 188.181818 4038.636364
2008 40.604000 18.924000 193.640000 4098.000000
2009 40.557692 18.846154 194.807692 3995.192308
Chinstrap female 2007 46.569231 17.838462 188.692308 3569.230769
2008 46.000000 17.300000 192.666667 3472.222222
2009 47.008333 17.533333 194.333333 3522.916667
male 2007 50.876923 19.130769 196.153846 3819.230769
2008 51.400000 19.600000 202.777778 4127.777778
2009 51.100000 19.125000 201.833333 3927.083333
Gentoo female 2007 45.062500 13.993750 211.062500 4618.750000
2008 45.295455 14.131818 213.000000 4627.272727
2009 46.260000 14.550000 213.700000 4786.250000
male 2007 49.000000 15.364706 218.882353 5552.941176
2008 48.539130 15.704348 222.086957 5410.869565
2009 50.880952 16.019048 223.095238 5510.714286

Note that a Pipeline updates dynamically if any Source, Filter or Transform is changed or updated. In the pipeline above we declared ‘widget’ filters which we can render by accessing the control_panel property. To view the dynamically updating data can instantiate a lumen.views.Table component. In a live notebook we will now get an interactive application letting us dynamically filter the data and view the result.

from lumen.views import Table

pn.Row(pipeline.control_panel, Table(pipeline=pipeline, pagination='remote'))

Note that since a Pipeline is a Parameterized object we can pass the data parameter to a Panel component e.g. here we bind the data to a DataFrame pane:

pn.Row(
    pipeline.control_panel,
    pn.pane.DataFrame(pipeline.param.data, width=800)
)

Programmatically building pipelines#

A Pipeline does not have to be built using the declarative specification, instead we can programmatically build the same pipeline:

from lumen.sources import FileSource

source = FileSource(tables={'penguins': data_url})
pipeline = Pipeline(source=source, table='penguins')

pipeline.add_filter('widget', field='species')
pipeline.add_filter('widget', field='island')
pipeline.add_filter('widget', field='sex')
pipeline.add_filter('widget', field='year')

pipeline

Automatic filters#

By setting filters='auto' we can also have Lumen automatically generate filters for all available columns:

Pipeline(source=source, table='penguins', filters='auto')

Chaining pipelines#

In some cases you will want to build branching pipelines, e.g. in an initial stage you filter the data and display it and in another stage you aggregate your data.

from lumen.transforms import Aggregate

agg_pipeline = pipeline.chain(transforms=[Aggregate(method='mean', by=['species', 'year'])])

agg_pipeline.data
bill_length_mm bill_depth_mm flipper_length_mm body_mass_g
species year
Adelie 2007 38.824490 18.767347 186.591837 3696.428571
2008 38.560000 18.192000 191.040000 3742.000000
2009 38.982692 18.098077 192.076923 3664.903846
Chinstrap 2007 48.723077 18.484615 192.423077 3694.230769
2008 48.700000 18.450000 197.722222 3800.000000
2009 49.054167 18.329167 198.083333 3725.000000
Gentoo 2007 47.014706 14.688235 215.117647 5070.588235
2008 46.936957 14.923913 217.565217 5019.565217
2009 48.500000 15.276744 218.418605 5140.697674

By chaining the Pipeline we can share computations between different stages, i.e. the filtering step still occurs in the first stage.

Building a dashboard#

One of the major benefits of the Pipeline architecture is that it allows Lumen components to be used outside of the context of a Lumen application. This makes it possible to build custom dashboards while still leveraging the power of all Lumen components. Let us compose a simple dashboard application driven by the pipeline we defined above.

from lumen.views import hvPlotUIView

pn.Row(
    pipeline.control_panel.servable(area='sidebar'),
    pn.Tabs(
        ('Plot', hvPlotUIView(pipeline=pipeline, kind='scatter', x='bill_length_mm', y='bill_depth_mm', by='species')),
        ('Table', Table(pipeline=agg_pipeline))
    ).servable()
)