IntakeSQLSource type: intake_sql#
- class lumen.sources.intake_sql.IntakeSQLSource(*, filter_in_sql, excluded_tables, catalog, dask, uri, load_schema, cache_data, cache_dir, cache_metadata, cache_per_query, cache_schema, cache_with_dask, metadata_func, root, shared, name)#
IntakeSQLSource extends the IntakeSource with support for SQL data.
In addition to the standard intake support for reading catalogs the IntakeSQLSource computes the schema by querying the database instead of loading all the data into memory and allows for SQLTransform to be applied when querying the SQL database.
Parameters#
type: dict
default: None
An inlined Catalog specification.
type: bool
default: False
Whether to return a dask DataFrame.
type: list
default: []
List of table names that should be excluded from the results. Supports:- Fully qualified name: ‘DATABASE.SCHEMA.TABLE’- Schema qualified name: ‘SCHEMA.TABLE’- Table name only: ‘TABLE’- Wildcards: ‘SCHEMA.*’
type: bool
default: True
type: bool
default: True
Whether to load the schema
type: str
default: ''
URI of the catalog file.
Methods#
- IntakeSQLSource.clear_cache(*events: Event)#
Clears any cached data.
- IntakeSQLSource.create_sql_expr_source(tables: dict[str, str], **kwargs)#
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
- IntakeSQLSource.execute(sql_query: str, *args, **kwargs) DataFrame #
Executes a SQL query and returns the result as a DataFrame.
- Parameters:
sql_query (str) – The SQL Query to execute
*args (list) – Positional arguments to pass to the SQL query
**kwargs (dict) – Keyword arguments to pass to the SQL query
- Returns:
The result as a pandas DataFrame
- Return type:
pd.DataFrame
- IntakeSQLSource.get(table, **query)#
Applies SQL Transforms, creating new temp catalog on the fly and querying the database.
- IntakeSQLSource.get_metadata(table: str | list[str] | None) dict #
Returns metadata for one, multiple or all tables provided by the source.
The metadata for a table is structured as:
- {
“description”: …, “columns”: {
- <COLUMN>: {
“description”: …, “data_type”: …,
}
}, **other_metadata
}
If a list of tables or no table is provided the metadata is nested one additional level:
- {
- “table_name”: {
- {
“description”: …, “columns”: {
<COLUMN>: { “description”: …, “data_type”: …, }
}, **other_metadata
}
}
}
- Parameters:
table (str | list[str] | None) – The name of the table to return the schema for. If None returns schema for all available tables.
- Returns:
metadata – Dictionary of metadata indexed by table (if no table was was provided or individual table metdata.
- Return type:
dict
- IntakeSQLSource.get_schema(table: str | None = None, limit: int | None = None, shuffle: bool = False) dict[str, dict[str, Any]] | dict[str, Any] #
Returns JSON schema describing the tables returned by the Source.
- Parameters:
table (str | None) – The name of the table to return the schema for. If None returns schema for all available tables.
limit (int | None) – Limits the number of rows considered for the schema calculation
- Returns:
JSON schema(s) for one or all the tables.
- Return type:
dict
- IntakeSQLSource.get_sql_expr(table)#
Returns the SQL expression corresponding to a particular table.
- IntakeSQLSource.get_tables()#
Returns the list of tables available on this source.
- Returns:
The list of available tables on this source.
- Return type:
list
- IntakeSQLSource.normalize_table(table: str) str #
Allows implementing table name normalization to allow fuzze matching of the table name for minor variations such as quoting differences.
- IntakeSQLSource.to_spec(context: dict[str, Any] | None = None) dict[str, Any] #
Exports the full specification to reconstruct this component.
- Return type:
Resolved and instantiated Component object