Data Ingestion from Data Catalogs to Unity Catalog
Data Pipeline Studio supports data ingestion from Lazsa Data Ingestion Catalogs to Unity Catalog data lake. You can crawl data from various types of sources and create a data catalog from it, which can be used in the data source stage of a data pipeline. Data crawlers can connect to various data sources, including databases, data lakes, APIs, and file systems providing wider visibility and deeper access to data.
To use a data ingestion catalog as a data source in a data pipeline, you must first create a crawler using sources such as CSV, MS Excel, Parquet, FTP, SFTP. After creating a crawler you create a data catalog from it. While creating a data catalog, you can filter the data that you bring from the crawler to the data catalog. For more information, see Data Crawler and Data Catalog.
Data Pipeline Studio currently supports the following sources for creating a crawler and catalog and using it as a Lazsa Data Ingestion Catalog in the source stage of a pipeline:
-
CSV
-
MS Excel
-
Parquet
-
FTP (using CSV, XLSX, JSON, and Parquet file formats)
-
SFTP (using CSV, XLSX, JSON, Parquet file formats)
-
MySQL
-
MS SQL Server
-
Oracle
-
PostgreSQL
-
REST API (using CSV and JSON format)
-
Snowflake
Prerequisites
-
Access to a Databricks node that has Unity Catalog enabled which will be used as a data integration node in the data ingestion pipeline. The Databricks Runtime version of the cluster must be 14.3.
-
Access to a Databricks Unity Catalog node which will be used as a data lake in the data ingestion pipeline.
Creating a data ingestion pipeline
-
On the home page of Data Pipeline Studio, add the following stages and connect them as shown below:
-
Data Integration (Databricks - Unity Catalog enabled)
-
Data Lake (Databricks Unity Catalog)
For the sake of an example, we are using FTP in the data source node.
-
Configure the data source node.
-
Configure the data lake node.
-
Click the dropdown Use an existing Databricks Unity Catalog, select an instance. Click Add to data pipeline.
-
Click the dropdown Schema Name and select a schema.
-
Click Data Browsing. Browse the folders and view the required files. This step is optional.
-
Click Save.
-
-
Click the Databricks node in the data integration stage. Click Create Templatized Job. Complete the following steps to create the job:
-
Template - Based on the source and destination that you choose in the data pipeline, the template is automatically selected.
-
Job Name - Provide a name for the data integration job.
-
Node rerun Attempts - Specify the number of times the pipeline rerun is attempted on this node of the pipeline, in case of failure. The default setting is done at the pipeline level. You can change the rerun attempts by selecting 1,2, or 3.
-
Fault Tolerance - Define the behavior of the node upon failure, where the descendent nodes can either stop and skip execution or can continue their normal operation. The available options are:
-
Default - If a node fails, the subsequent nodes go into pending state.
-
Proceed on Failure - If a node fails, the subsequent nodes are executed.
-
Skip on Failure - If a node fails, the subsequent nodes are skipped from execution.
For more information, see Fault Tolerance of Data Pipelines.
-
-
Source - This field is automatically populated, depending on the data source node configured and added in the pipeline.
-
Catalogs - This field is automatically populated, depending on the configured catalog that you added in the pipeline.
-
Catalog Schema - This field is automatically populated, depending on the schema that you selected for the configured catalog.
-
Schema Tables - The selected tables from the configured catalog are displayed.
-
Datastore - This field is automatically populated, based on the Databricks Unity Catalog datastore that you configured for the data lake stage.
-
Catalog Name - This field is automatically populated, based on the catalog that is selected for the Unity Catalog instance.
-
Schema Name - Select the schema in which you want to ingest the data in the target data lake.
-
Object Tagging - You can assign tags to various data assets like tables, views, and files in the Databricks environment. Tagging allows you to categorize and manage data assets effectively.
Do the following:
-
Enable Object Tagging to add tags.
After you enable tagging and complete the mapping of source data to target tables, you can add tags.
-
SQL Warehouse - select a SQL warehouse to fetch existing tags.
-
-
Map source data to target tables - Map the source file with a table in the target.
-
Source - Select a source file.
-
Target Table - You can either map an existing table or create a new table and map it. Do one of the following:
-
Select a table from the dropdown.
-
Type a name for a new table and click Click to create "table name".
-
-
-
In the Tags for Target column, click Add Tags.
-
On the Add Tags side drawer, select a key from the dropdown and provide a value for each key.
-
Click Save.
-
Mapped Data - This provides the mapping of the source file with the target table that was done in the previous stage. You can do the following:
-
Change the mapping of the columns, if required.
-
Edit the names of the columns, if required.
-
Add constraints
-
-
Infer Source Schema - This option lets you identify the schema of columns with data types automatically.
-
Evolve Schema - This option lets you add or drop a column in the target table depending on the change in the source table.
-
Filter columns from selected tables- You deselect columns as per your use case and run a constraint on each column.
-
Deselect columns that are not required, from the list of columns that is populated, and provide custom names to certain columns.
-
Select a constraint from the options: Set Not Null and Check. For the check constraint you must provide a SQL condition that needs to be checked for the column.
-
-
Continue job run even if constraints are not met - This toggle ensures that the job run is continued even if a constraint is not met.
-
Add Custom Columns - Enable this option to add additional columns apart from the existing columns of the table. To add custom columns, do the following:
Click Add Custom Column after providing the details for each column. Repeat the steps for the number of columns that you want to add.
-
Column Name - Provide a column name for the custom column that you want to add.
-
Type and Value - Select the parameter type for the new column. Choose from the following options:
-
Static Parameter - Provide a static value that is added for this column.
-
System Parameter - Select a system-generated parameter from the dropdown list that must be added to the custom column.
-
Generated - Provide the SQL code to combine two or more columns to generate the value of the new column.
-
-
Click Add Custom Column after adding the details for each column.
-
Repeat steps 1-3 for the number of columns that you want to add. After adding the required custom columns, click Add Column Mapping Details.
-
To review the column mapping details, click the ellipsis (...) and click View Details.
-
Click Next.
-
Map source data to target tables - Select the mapped source to an existing or new target table.
-
Target - Click the dropdown and select an existing Snowflake table or type a new name and click Click to create "table name".
-
Operation Type - Select the operation type to perform on the source data. Choose one of the following options:
-
Append - Adds new data at the end of a file without erasing the existing content.
-
Merge - Adds data to the target table for the first job run. For each subsequent job run, the data from the target table is merged with the change in source data.
-
Overwrite - Replaces the entire content of a file with new data.
-
-
-
Enable Partitioning - Enable this option if you want to use partitioning for the target data. Select from the following options:
-
Data Partition - Select column name from the dropdown list. Select a column from the dropdown list. Click Add.
-
Date Based Partition - Select the type from the options - yearly, monthly , or daily. Provide the prefix that you would like to add to the partition. Adding a prefix is optional.
Click Add. The Data mapping for mapped tables displays the mapping details. Click the ellipsis (...) to edit or delete the mapping.
-
-
Spot
-
On-demand
-
Spot with fallback
-
Select All
-
Node Execution Failed
-
Node Execution Succeeded
-
Node Execution Running
-
Node Execution Rejected

Provide job details for the data integration job:


Note:
If there is a change to the source schema, then you must run the data crawler again and create a new data catalog to fetch the latest metadata.
Click Next.

Click Map Table. To delete a mapping click the ellipsis (...) and then click Delete.
The summary table shows the mapping of the table, table for rejected records and information about tags. To add tags, do the following:
Click Next.

In this step you map the source with a target table. Do the following:

In this step you select the operation type that you want to perform on the source table and the partitioning that you want to create on the target table.

You can select an all-purpose cluster or a job cluster to run the configured job. In case your Databricks cluster is not created through the Calibo Accelerate platform and you want to update custom environment variables, refer to the following:

Cluster - Select the all-purpose cluster that you want to use for the data integration job, from the dropdown list.

Cluster Details | Description |
---|---|
Choose Cluster | Provide a name for the job cluster that you want to create. |
Job Configuration Name | Provide a name for the job cluster configuration. |
Databricks Runtime Version | Select the appropriate Databricks version. |
Worker Type | Select the worker type for the job cluster. |
Workers |
Enter the number of workers to be used for running the job in the job cluster. You can either have a fixed number of workers or you can choose autoscaling. |
Enable Autoscaling | Autoscaling helps in scaling up or down the number of workers within the range specified by you. This helps in reallocating workers to a job during its compute-intensive phase. Once the compute requirement reduces the excess number of workers are removed. This helps control your resource costs. |
Cloud Infrastructure Details | |
First on Demand |
Provide the number of cluster nodes that are marked as first_on_demand. The first_on_demand nodes of the cluster are placed on on-demand instances. |
Availability |
Choose the type of EC2 instances to launch your Apache Spark clusters, from the following options: |
Zone |
Identifier of the availability zone or data center in which the cluster resides. The provided availability zone must be in the same region as the Databricks deployment. |
Instance Profile ARN | Provide an instance profile ARN that can access the target Amazon S3 bucket. |
EBS Volume Type | The type of EBS volume that is launched with this cluster. |
EBS Volume Count | The number of volumes launched for each instance of the cluster. |
EBS Volume Size | The size of the EBS volume to be used for the cluster. |
Additional Details | |
Spark Config | To fine tune Spark jobs, provide custom Spark configuration properties in key value pairs. |
Environment Variables | Configure custom environment variables that you can use in init scripts. |
Logging Path (DBFS Only) | Provide the logging path to deliver the logs for the Spark jobs. |
Init Scripts | Provide the init or initialization scripts that run during the start up of each cluster. |

You can configure the SQS and SNS services to send notifications related to the node in this job. This provides information about various events related to the node without actually connecting to the Calibo Accelerate platform.
SQS and SNS |
---|
Configurations - Select an SQS or SNS configuration that is integrated with the Calibo Accelerate platform. |
Events - Enable the events for which you want to enable notifications: |
Event Details - Select the details of the events from the dropdown list, that you want to include in the notifications. |
Additional Parameters - Provide any additional parameters that are to be added in the SQS and SNS notifications. A sample JSON is provided, you can use this to write logic for processing the events. |
What's next? Data Issue Resolver using Unity Catalog |