Connecting to Databases and External Datasources
With connectors available for databases and support for data sources, Prometheux can be used to seamlessly integrate and migrate data across various platforms. Moreover, it supports cloud and distributed file systems like S3 and HDFS, providing the flexibility needed for modern data lake and data migration scenarios.
@bind options
The bind command allows for the configuration of reading
from and writing
to database and datasources.
The syntax is as follows:
@bind("concept_name",
"datasource_type option_1 = 'value_1', option_2 = 'value_2', …, option_n = 'value_n'",
"database_name",
"table_name").
where datasource_type
should be one of:
csv
for CSV filesparquet
for Parquet filesexcel
for Excel filesjson
for JSON filespostgresql
for PostgreSQL databasesneo4j
for Neo4j databasesdb2
for DB2 databasesmariadb
for MariaDB databasesoracle
for Oracle databasessqlite
for SQLite databasesmysql
for MySQL databasessqlserver
for SQL Server databasesh2
for H2 databasessybase
for Sybase databasesteradata
for Teradata databasesredshift
for Redshift databasesbigquery
for Google BigQueryhive
for Hivepresto
for Prestosnowflake
for Snowflakedatabricks
for Databricksapi
for consuming data via API
And the available configuration options are:
url
: URL to use for the database connection (e.g.jdbc:postgresql://localhost:5432/prometheux
)protocol
: Protocol to use for the database connection (e.g.jdbc
,odbc
,jdbc-odbc
,bolt
)host
: Database host (e.g.localhost
)port
: Database port (e.g.,5432
for postgres,7678
for neo4j)database
: Database name (e.g.prometheux
)username
: Username to login with.password
: Password to login with.
Configuring credential access
Sensitive credentials such as database connection details (e.g., username, password) or AWS credentials (e.g., accessKey, secretKey) can be specified directly as options in the @bind annotations. Example:
@bind("concept_name",
"datasource_type option_1 = 'value_1', option_2 = 'value_2', …, option_n = 'value_n'",
"database_name",
"table_name").
This method allows for streamlined integration within the same code, ensuring that each datasource has its necessary credentials attached during the binding process.
However, for better security and flexibility, these credentials can also be stored in external configurations:
- Credentials can be stored the
pmtx.properties
file to centralize sensitive information and allow reusability without hardcoding values within the @bind annotations. - REST APIs for dynamic configuration management, where you can set individual credentials or update multiple settings at once through API endpoints.
CSV Datasource
Prometheux supports CSV files as data source, both for reading and writing.
The default CSV binding ("csv"
) is thus suitable for processing big CSV files.
It does not make a guess about the input schema. Therefore, if no schema
(@mapping) is provided, all fields are treated as
strings. Values \N
are treated as null
values and interpreted as Labelled
Nulls while reading the CSV file.
@bind options
The bind command allows for the configuration of reading and writing csv files. The syntax is as follows:
@bind("relation",
"csv option_1 = 'value_1', option_2 = 'value_2', …, option_n = 'value_n'",
"filepath",
"filename").
The options that are available are
useHeaders
: Values can betrue
orfalse
, depending on whether a header is available/output.delimiter
: Specifies the character that is used to separate single entriesrecordSeparator
: Specifies how the record are seperatedquoteMode
: Defines quoting behavior. Possible values are:all
: Quotes all fields.minimal
: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string.non_numeric
: Quotes all non-numeric fields.none
: Never quotes fields.
nullString
: Value can be any string, which replaces null values in the csv file.selectedColumns
: Value is a list of the form[c1;…;cn]
to select only the columnsc1, …,cn
from the csv file.- Each value in the list is either a column name (enclosed with single quotes 'column name') which is present in the csv's header, or is an integer starting from 0 denoting the column index.
- It is also possible to specify ranges; e.g.
selectedColumns=[0:4]
reads only the five columns0,1,2,3,4
. - It is allowed to mix the values in the list, e.g.
selectedColumns=[0:3;Column_5]
would select columns0,1,2,3
and the column with the nameColumn_5
. - Note that in order to select columns by name, a header line in the csv must be present.
multiline
: Handles multi-line fields. When multiline is set to true it handles fields that span multiple lines correctlyquery
: perform aSELECT
query over a CSV file. If you are familiar with SQL syntax, you can leverage that to query the CSV file. The fields of the select are the one of the first line of the CSV file.coalesce
: Unifies the output in one partition. The output will be a single CSV file instead of partitioned CSV files. Supported only in standalone environments.
When specifying a configuaration, any subset of these options can be set at the
same time. Each value has be surrounded by single quotation marks 'value'
. An
example for a csv bind command with configuration would be the following:
@bind("relation",
"csv useHeaders=false, delimiter='\t', recordSeparator='\n',
query='select Name from users'",
"filepath",
"filename").
Examples
In the examples below we use a sample CSV file with the following content:
a,b,c
d,e,f
Simply reading a csv file into a relation:
@input("myCsv").
@bind("myCsv", "csv", "/path_to_csv/folder", "csv_name.csv").
myAtom(X,Y,Z) :- myCsv(X,Y,Z).
@output("myAtom").
We can also map the columns:
@input("myCsv").
@bind("myCsv", "csv useHeaders=true", "/path_to_csv/folder", "csv_name.csv").
@mapping("myCsv", 0, "var1", "string").
@mapping("myCsv", 1, "var2", "string").
@mapping("myCsv", 2, "var3", "string").
myAtom(X,Y,Z) :- myCsv(X,Y,Z).
@output("myAtom").
Example for selectedColumns
The following reads only the the four columns with indices 0,1,2,4
from the
CSV file, excluding the column 3
.
@input("myCsv").
@bind("myCsv", "csv selectedColumns=[0:2; 4]", "/path_to_csv/folder", "csv_name.csv").
withoutThree(W, X,Y,Z) :- myCsv(W, X,Y,Z).
@output("withoutThree").
To store results into another CSV file, you must bind the entry point. In this example, we read a CSV file, perform a SQL query to select Name, Surname and Age of users without mapping annotations and write the output into another CSV file:
% Define the input source as a CSV file named "users.csv" located in "/path_to_csv/folder"
% The file contains columns "Name", "Surname", "Age" and potentially other fields. Perform a `select` query over the CSV file and filter and only records where "Age > 10", which are loaded into the "user" relation.
@input("user").
@bind("user", "csv query='select Name,Surname,Age from user where Age > 10'", "/path_to_csv/folder", "users.csv").
% Define a new relation "user_name_surname" that selects users from the "user" relation
% with an additional condition that their "Age" is less than 5. This relation returns the
% "Name", "Surname", and "Age" of users.
user_name_surname(X,Y,Z) :- user(Name,Surname,Age), Age < 5.
% Declare that the results of "user_name_surname" will be written to an output.
@output("user_name_surname").
% Bind the "user_name_surname" relation to output partitioned CSV files in a folder named "new_users"
% in the folder "/another_csv_path/folder/output".
@bind("user_name_surname", "csv", "/another_csv_path/folder/output", "new_users").
Parquet Datasource
Parquet is a columnar storage format optimized for both storage efficiency and query performance, making it well-suited for large-scale data lake scenarios. In this section, we'll explore how Parquet files can be integrated into Vadalog workflows, using the provided example:
% Declare the input concept 'shipping_parquet' that will be used to refer to the data
@input("shipping_parquet").
% Bind the 'shipping_parquet' concept to a Parquet file located in the specified directory
% The data source type is 'parquet', and the file is 'shipping.parquet' in the 'disk/data/input_files' folder
@bind("shipping_parquet", "parquet", "disk/data/input_files", "shipping.parquet").
% Define a rule that extracts the 'OrderId' and 'ShippingDate' from the 'shipping_parquet' data
% The 'shipping_parquet_test' concept is created from the data in the 'shipping_parquet' input
shipping_parquet_test(OrderId, ShippingDate) :- shipping_parquet(OrderId, ShippingDate).
% Declare the output concept 'shipping_parquet_test', making the processed data available for output
@output("shipping_parquet_test").
Excel Datasource
Excel files are widely used for tabular data storage and exchange in business and analytics workflows. Prometheux can easily read from and write to Excel files, integrating them seamlessly into its data processing workflows.
In this example, we will read data from a CSV file and populate an Excel file with the extracted data.
% Declare the input concept 'shipping_excel_csv' to read data from a CSV file
@input("shipping_excel_csv").
% Bind the 'shipping_excel_csv' concept to the CSV file 'shipping_data_excel.csv' located in the 'disk/data/generated_data' directory
% Use 'useHeaders=true' to indicate that the first row contains column headers
@bind("shipping_excel_csv", "csv useHeaders=true", "disk/data/generated_data", "shipping_data_excel.csv").
% Model definition for the 'shipping_excel' concept, specifying the data types for OrderId (int) and ShippingDate (date)
@model("shipping_excel", "['OrderId:int','ShippingDate:date']").
% Rule to populate the 'shipping_excel' concept by extracting OrderId and ShippingDate from 'shipping_excel_csv'
shipping_excel(OrderId, ShippingDate) :- shipping_excel_csv(OrderId, ShippingDate).
% Bind the 'shipping_excel' concept to an Excel file 'shipping.xls' in the 'disk/data/input_files' directory
% Use 'useHeaders=true' to indicate that column headers should be included in the Excel file
@bind("shipping_excel", "excel useHeaders=true", "disk/data/input_files", "shipping.xls").
% Declare the output concept 'shipping_excel' for writing to the Excel file
@output("shipping_excel").
In this example, we will read data from the previously populated Excel file.
% Declare the input concept 'shipping_excel' to read from the Excel file
@input("shipping_excel").
% Bind the 'shipping_excel' concept to the Excel file 'shipping.xls' located in 'disk/data/input_files'
% Use 'useHeaders=true' to indicate that the first row contains column headers
@bind("shipping_excel", "excel useHeaders=true", "disk/data/input_files", "shipping.xls").
% Define a rule that extracts OrderId and ShippingDate from 'shipping_excel'
shipping_excel_test(OrderId, ShippingDate) :- shipping_excel(OrderId, ShippingDate).
% Declare the output concept 'shipping_excel_test' for making the processed data available
@output("shipping_excel_test").
In this example, we will read data from a specific sheet of an Excel file.
% Declare the input concept 'shipping_excel_sheet' to read data from a specific sheet of an Excel file
@input("shipping_excel_sheet").
% Bind the 'shipping_excel_sheet' concept to the Excel file 'shipping.xls' located in 'disk/data/input_files'
% Use 'useHeaders=true' to indicate that the first row contains column headers
@bind("shipping_excel_sheet", "excel useHeaders=true, dataAddress=''Sheet1'!A1'", "disk/data/input_files", "shipping.xls").
% Define a rule that extracts OrderId and ShippingDate from 'shipping_excel_sheet'
shipping_excel_sheet_test(OrderId, ShippingDate) :- shipping_excel_sheet(OrderId, ShippingDate).
% Declare the output concept 'shipping_excel_sheet_test' for making the processed data available
@output("shipping_excel_sheet_test").
PostgreSQL Database
PostgreSQL is a robust open-source relational database that supports a wide range of data types and advanced querying capabilities. In this section, we will explore how to integrate PostgreSQL with Vadalog by first populating a customer table from a CSV file and then reading data from it using two approaches: full table read and a custom query.
In this example, we read data from a CSV file and populate the customer table in a PostgreSQL database.
% Declare the input concept 'customer_postgres_csv' to read from the CSV file
@input("customer_postgres_csv").
% Bind the 'customer_postgres_csv' concept to the CSV file located in 'disk/data/generated_data/customer_postgres.csv'
% The option 'useHeaders=true' indicates the CSV file contains headers
@bind("customer_postgres_csv", "csv useHeaders=true", "disk/data/generated_data", "customer_postgres.csv").
% Define a rule that extracts CustomerID, Name, Surname, and Email from the CSV and assigns them to the 'customer_postgres' concept
customer_postgres(CustomerID, Name, Surname, Email) :-
customer_postgres_csv(CustomerID, Name, Surname, Email).
% Define the data model for the 'customer_postgres' concept (mapping column names to types)
@model("customer_postgres", "['customer_id:int', 'name:string', 'surname:string', 'email:string']").
% Declare the 'customer_postgres' concept as the output, which will be written to the PostgreSQL database
@output("customer_postgres").
% Bind the 'customer_postgres' concept to a PostgreSQL table 'customer' in the 'prometheux' database
% Specify the database connection details (host, port, username, and password)
@bind("customer_postgres", "postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw'",
"prometheux", "customer").
This example demonstrates reading the full customer table from PostgreSQL.
% Declare the input concept 'customer_postgres' to read data from the 'customer' table in PostgreSQL
@input("customer_postgres").
% Bind the 'customer_postgres' concept to the PostgreSQL table 'customer' in the 'prometheux' database
@bind("customer_postgres", "postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw'",
"prometheux", "customer").
% Define a rule to extract CustomerID, Name, Surname, and Email from the 'customer' table in PostgreSQL
customer_postgres_test(CustomerID, Name, Surname, Email) :-
customer_postgres(CustomerID, Name, Surname, Email).
% Declare the output concept 'customer_postgres_test' to make the processed data available
@output("customer_postgres_test").
In this example, we read specific columns and filter data using a SQL query.
% Declare the input concept 'customer_postgres' to read data using a custom SQL query
@input("customer_postgres").
% Bind the 'customer_postgres' concept to PostgreSQL using a SQL query
% The query filters for CustomerID > 0 and selects CustomerID and Email
@qbind("customer_postgres", "postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw', database='prometheux'",
"", "select CustomerID, Email from customer where CustomerID > 0").
% Define a rule to filter the emails that end with 'prometheux.ai'
customer_postgres_test(CustomerID, Email) :-
customer_postgres(CustomerID, Email), OnlyPx = ends_with(Email, "prometheux.ai"), OnlyPx = #T.
% Declare the output concept 'customer_postgres_test' to make the filtered data available
@output("customer_postgres_test").
MariaDB Database
MariaDB is a popular open-source relational database, highly compatible with MySQL. It supports various SQL features and is commonly used in web applications and data platforms. In this example, we will explore how to interact with a MariaDB database in Prometheux, focusing on reading data from the order_customer table to test if the data has been populated correctly.
% Declare the input concept 'order_mariadb' to read data from the 'order_customer' table in MariaDB
@input("order_mariadb").
% Bind the 'order_mariadb' concept to the 'order_customer' table in MariaDB
% The connection details (host, port, username, and password) are specified
@bind("order_mariadb", "mariadb host='mariadb-host', port=3306, username='prometheux', password='myPassw'",
"prometheux", "order_customer").
% Define a rule that extracts OrderId, CustomerId, and Cost from the 'order_customer' table
order_mariadb_test(OrderId, CustomerId, Cost) :-
order_mariadb(OrderId, CustomerId, Cost).
% Declare the output concept 'order_mariadb_test', making the processed data available
@output("order_mariadb_test").
Neo4j Database
Neo4j is a graph database designed for efficiently storing and querying highly connected data. In this example, we'll explore how to populate Neo4j with nodes representing Person and Order entities, as well as relationships between them. We'll also cover how to query this data using a Cypher query.
This example shows how to read data from a CSV file, populate Neo4j with Person and Order nodes, and create a relationship between them.
% Declare the input concept 'persons_order_neo4j_csv' to read data from the CSV file
@input("persons_order_neo4j_csv").
% Bind the 'persons_order_neo4j_csv' concept to the CSV file located in 'disk/data/generated_data/persons_order_neo4j.csv'
@bind("persons_order_neo4j_csv", "csv useHeaders=true", "disk/data/generated_data", "persons_order_neo4j.csv").
% Define the 'person_neo4j' concept by extracting customer details from the CSV file
person_neo4j(CustomerId, Name, Surname, Email) :-
persons_order_neo4j_csv(CustomerId, Name, Surname, Email, OrderId, Cost).
% Bind the 'person_neo4j' concept to a Neo4j node with label 'Person'
@output("person_neo4j").
@bind("person_neo4j", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Person)").
% Map the 'person_neo4j' concept's fields to the Neo4j 'Person' node properties
@model("person_neo4j", "['customerId(ID):int', 'name:string', 'surname:string', 'email:string']").
% Define the 'order' concept by extracting order details from the CSV file
order(OrderId, Cost) :-
persons_order_neo4j_csv(CustomerId, Name, Surname, Email, OrderId, Cost).
% Bind the 'order' concept to a Neo4j node with label 'Order'
@output("order").
@bind("order", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Order)").
% Map the 'order' concept's fields to the Neo4j 'Order' node properties
@mapping("order", 0, "orderId(ID)", "int").
@mapping("order", 1, "cost", "string").
% Define the 'order_person_rel_neo4j' concept for creating a relationship between Order and Person
order_person_rel_neo4j(OrderId, CustomerId) :-
persons_order_neo4j_csv(CustomerId, Name, Surname, Email, OrderId, Cost).
% Bind the 'order_person_rel_neo4j' concept to create a relationship between the 'Order' and 'Person' nodes in Neo4j
@output("order_person_rel_neo4j").
@bind("order_person_rel_neo4j", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Order)-[IS_RELATED_TO]->(:Person)").
% Map the 'order_person_rel_neo4j' concept's fields to the relationship between Order and Person
@mapping("order_person_rel_neo4j", 0, "orderId:orderId(sID)", "int").
@mapping("order_person_rel_neo4j", 1, "customerId:customerId(tID)", "int").
In this example, we query Neo4j to retrieve the relationship between Person and Order nodes.
% Declare the input concept 'persons_order_neo4j' for querying Neo4j
@input("persons_order_neo4j").
% Use @qbind to execute a Cypher query that retrieves OrderId and CustomerId from related Order and Person nodes
@qbind("persons_order_neo4j", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "",
"MATCH (o:Order)-[r:IS_RELATED_TO]->(p:Person) RETURN o.orderId, p.customerId").
% Define a rule to store the result of the Cypher query into the 'persons_order_neo4j_test' concept
persons_order_neo4j_test(CustomerId, OrderId) :-
persons_order_neo4j(CustomerId, OrderId).
% Declare the output concept 'persons_order_neo4j_test' to make the query result available
@output("persons_order_neo4j_test").
S3 Storage
Amazon S3 (Simple Storage Service) is a widely used cloud storage service that allows for scalable object storage. In Vadalog, you can both read from and write to S3 buckets by treating the S3 storage as a file system. This example demonstrates how to interact with S3, specifically by writing a CSV file to S3 and then reading from it.
In this example, we first read data from a CSV file stored locally and then write it to an S3 bucket.
% Declare the input concept 'user_csv' to read from the CSV file located on the local disk
@input("user_csv").
% Bind the 'user_csv' concept to the local CSV file 'users.csv' located in 'disk/datasources'
@bind("user_csv", "csv useHeaders=true", "disk/datasources", "users.csv").
% Define a rule to map the data from 'user_csv' to the 'user' concept
user(X) :- user_csv(X).
% Declare the output concept 'user' for writing the data to an S3 bucket
@output("user").
% Bind the 'user' concept to a CSV file in the specified S3 bucket
@bind("user", "csv", "s3a://your-s3-bucket/", "user.csv").
In this example, we demonstrate how to read data from a CSV file stored in an S3 bucket.
% Declare the input concept 'user_s3' to read data from the CSV file in the S3 bucket
@input("user_s3").
% Bind the 'user_s3' concept to the CSV file 'user.csv' located in the specified S3 bucket
@bind("user_s3", "csv", "s3a://your-s3-bucket/", "user.csv").
% Define a rule to map the data from 'user_s3' to the 'user' concept
user(X) :- user_s3(X).
% Declare the output concept 'user' for making the data available after reading from S3
@output("user").
Example: S3 Storage Integration
You can use a similar approach to bind predicates to CSV files stored in Amazon S3:
% Declare the input concept 'user_s3' to read data from the CSV file in the S3 bucket
@input("user_s3").
% Bind the 'user_s3' concept to the CSV file 'user.csv' located in the specified S3 bucket
@bind("user_s3", "csv", "s3a://your-s3-bucket/", "user.csv").
% Map the data as needed
user(X) :- user_s3(X).
% Declare the output concept
@output("user").
Consuming Data via API
This example shows how to connect to an external API endpoint that returns data in JSON, XML, CSV format
Below is a full example for ingesting weather data from the Meteomatics API in CSV format.
% Bind the predicate to the API endpoint
@bind(
"meteo",
"api delimiter=';', username='my_username', password='my_password', responseFormat='csv'",
"https://api.meteomatics.com/",
"2025-06-09T00:00:00Z--2025-06-12T00:00:00Z:PT3H/t_2m:C,relative_humidity_2m:p/47.423336,9.377225/csv"
).
% Declare an output predicate
@output("meteo_out").
% Map the data to the output predicate
meteo_out(Valid_Date, T_2m_C, Relative_humidity_2m_p) :- meteo(Valid_Date, T_2m_C, Relative_humidity_2m_p).
Customization
- API format: The pattern above supports any API providing CSV data. For JSON/XML APIs, adjust the format parameter and data mapping accordingly (e.g. use
json
orxml
in the@bind
). - Authentication: Many APIs support token-based auth (use
token='your_token'
instead of username/password). - Delimiter: Update the
delimiter
parameter to match your API's CSV format if not using semicolons.
HDFS File system
HDFS (Hadoop Distributed File System) is designed for distributed storage and large-scale data processing. Vadalog can integrate with HDFS by reading from and writing to files stored in HDFS clusters. This example shows how to read a CSV file from an HDFS location and process it within a Prometheux workflow.
% Declare the input concept 'user_csv' to read from the CSV file located in HDFS
@input("user_csv").
% Bind the 'user_csv' concept to the CSV file located in HDFS ('users.csv')
% The file is located in the HDFS directory: hdfs://hdfs-host:9000/user
% The 'useHeaders=true' option indicates that the first row contains column headers
@bind("user_csv", "csv useHeaders=true", "hdfs://hdfs-host:9000/user", "users.csv").
% Define a rule to map the data from 'user_csv' to the 'user' concept
user(X) :- user_csv(X).
% Declare the output concept 'user' for making the processed data available
@output("user").
Sybase Database
Sybase (now SAP ASE) is a relational database management system used for online transaction processing. This example shows how to read data from a Sybase database.
% Declare the input concept 'order_sybase' to read data from the 'orders' table in Sybase
@input("order_sybase").
% Bind the 'order_sybase' concept to the Sybase database using the JDBC connection details
@bind("order_sybase", "sybase host='sybase-host', port=5000, username='myUser', password='myPassw'",
"myDatabase", "orders").
% Define a rule that extracts OrderId, CustomerId, and Amount from the 'orders' table in Sybase
order_sybase_test(OrderId, CustomerId, Amount) :-
order_sybase(OrderId, CustomerId, Amount).
% Declare the output concept 'order_sybase_test' to make the processed data available
@output("order_sybase_test").
Teradata Database
Teradata is a highly scalable relational database often used in enterprise data warehousing. This example shows how to read data from a Teradata database.
% Declare the input concept 'sales_teradata' to read data from the 'sales' table in Teradata
@input("sales_teradata").
% Bind the 'sales_teradata' concept to the Teradata database using the JDBC connection details
@bind("sales_teradata", "teradata host='teradata-host', port=1025, username='myUser', password='myPassw'",
"myDatabase", "sales").
% Define a rule to extract SaleId, ProductId, and SaleAmount from the 'sales' table in Teradata
sales_teradata_test(SaleId, ProductId, SaleAmount) :-
sales_teradata(SaleId, ProductId, SaleAmount).
% Declare the output concept 'sales_teradata_test' for making the processed data available
@output("sales_teradata_test").
Amazon Redshift
Amazon Redshift is a fully managed data warehouse service designed for large-scale data analytics. This example shows how to read data from a Redshift table.
% Declare the input concept 'analytics_redshift' to read data from the 'analytics' table in Redshift
@input("analytics_redshift").
% Bind the 'analytics_redshift' concept to the Redshift database using the JDBC connection details
@bind("analytics_redshift", "redshift host='redshift-cluster.amazonaws.com', port=5439, username='myUser', password='myPassword'",
"analyticsDB", "analytics").
% Define a rule to extract data from the 'analytics' table in Redshift
analytics_redshift_test(AnalysisId, Metric, Value) :-
analytics_redshift(AnalysisId, Metric, Value).
% Declare the output concept 'analytics_redshift_test' for making the processed data available
@output("analytics_redshift_test").
Google BigQuery
Google BigQuery is a serverless, highly scalable, and cost-effective multi-cloud data warehouse. This example demonstrates configuring and querying data from a BigQuery dataset.
Setting up Google Cloud Access
This guide shows how to:
- Enable the required Google Cloud APIs
- Create a service account for Prometheux jobs
- Grant the minimum IAM roles (data access & Storage API)
- Allow a human user to impersonate the service account and obtain short‑lived access tokens
- Create a JSON key file
- Generate a one‑hour access token
Project ID example:
project-example-358816
Service account name example:example-sa
Human user example:example@gmail.com
Open a Google Cloud Shell within your Google Project and execute the following commands:
1 Enable required APIs
gcloud services enable compute.googleapis.com bigquery.googleapis.com bigquerystorage.googleapis.com iamcredentials.googleapis.com --project=project-example-358816
2 Create the service account
PROJECT_ID=project-example-358816
SA_NAME=example-sa
gcloud iam service-accounts create "$SA_NAME" --project="$PROJECT_ID" --display-name="Spark BigQuery SA"
Resulting e‑mail:
example-sa@project-example-358816.iam.gserviceaccount.com
3 Grant minimum BigQuery roles to the service account
gcloud projects add-iam-policy-binding "$PROJECT_ID" --member="serviceAccount:$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/bigquery.dataViewer"
gcloud projects add-iam-policy-binding "$PROJECT_ID" --member="serviceAccount:$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/bigquery.jobUser"
gcloud projects add-iam-policy-binding "$PROJECT_ID" --member="serviceAccount:$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --role="roles/bigquery.readSessionUser"
4 Allow your user to impersonate the service account
gcloud iam service-accounts add-iam-policy-binding "$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --member="user:example@gmail.com" --role="roles/iam.serviceAccountTokenCreator"
Verify:
gcloud iam service-accounts get-iam-policy "$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" --filter="bindings.role:roles/iam.serviceAccountTokenCreator"
5 Create a JSON key file
If you prefer file‑based creds:
gcloud iam service-accounts keys create ~/gcp-credentials.json --iam-account="$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com"
Read the content
cat ~/gcp-credentials.json
Copy and paste it into a new file in your laptop or environment in your /path/to/gcp-credentials.json
This authMode is the default one.
Set the ENV var (in Docker or via EXPORT
)
GOOGLE_APPLICATION_CREDENTIALS=/path/to/gcp-credentials.json
, or set the bigquery.credentialsFile=/path/to/gcp-credentials.json
in the pmtx.properties
configuration file or declare the path via credentialsFile=/path/to/gcp-credentials.json
as an option in the bind annotation.
6 Generate a one‑hour access token (impersonation)
If you prefer token‑based creds:
gcloud auth print-access-token --impersonate-service-account="$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com"
Enable token-based authMode by setting set the bigquery.authMode
in the pmtx.properties
configuration file or declare it as an option in the bind annotation authMode=gcpAccessToken
and set the ENV var (in Docker or via EXPORT
)
GCP_ACCESS_TOKEN=my-token
, or set the bigquery.gcpAccessToken=my-token
config property or pass it via gcpAccessToken=my-token
as option in the bind annotation.
Example using credentials file
% Bind the BigQuery table with credentials file
@bind("table",
"bigquery credentialsFile=/path/to/gcp-credentials.json",
"project-example-358816",
"bigquery-public-data.thelook_ecommerce.order_items").
% Rule to project the first three columns from the BigQuery table
bigquery_table(X,Y,Z) :- table(X,Y,Z).
% Post-process the BigQuery table to limit the results to 10
@post("bigquery_table","limit(10)").
% Define the output for the BigQuery table
@output("bigquery_table").
Example using token
% Bind the BigQuery table with token
@bind("table",
"bigquery authMode=gcpAccessToken, token=/path/to/gcp-credentials.json",
"project-example-358816",
"bigquery-public-data.thelook_ecommerce.order_items").
% Rule to project the first three columns from the BigQuery table
bigquery_table(X,Y,Z) :- table(X,Y,Z).
% Define the output for the BigQuery table
@output("bigquery_table").
% Post-process the BigQuery table to limit the results to 10
@post("bigquery_table","limit(10)").
Example via query
% Bind a BigQuery query to retrieve product revenue data
@qbind("query",
"bigquery credentialsFile=src/test/resources/bigquery-credentials.json",
"quantum-feat-358816",
"SELECT product_id, SUM(sale_price) AS revenue FROM `bigquery-public-data.thelook_ecommerce.order_items`
WHERE sale_price > 100 GROUP BY product_id ORDER BY revenue DESC LIMIT 10").
% Define a rule to map the query results to the bigquery_query predicate
bigquery_query(X,Y) :- query(X,Y).
% Declare the output for the bigquery_query predicate
@output("bigquery_query").
Snowflake
Snowflake is a cloud-based data warehousing service that allows for data storage, processing, and analytics.
How to Retrieve Your Snowflake Connection Info for reading or writing tables
To obtain the connection details for your Snowflake account:
- Click the user icon in the bottom-left corner of the Snowflake UI.
- Select "Connect a tool to Snowflake".
- Go to the "Connectors / Drivers" section.
- Choose "JDBC" as the connection method.
- Select your warehouse, database, and set "Password" as the authentication method.
This will generate a JDBC connection string in the following format:
jdbc:snowflake://A778xxx-IVxxxx.snowflakecomputing.com/?user=PROMETHEUX&warehouse=COMPUTE_WH&db=TEST&schema=PUBLIC&password=my_password
From this string, you can extract the following values for your bind configuration:
url = 'A778xxx-IVxxxx.snowflakecomputing.com'
username = 'PROMETHEUX'
password = 'my_password'
warehouse = 'COMPUTE_WH'
database = 'TEST'
(note: database names are usually uppercase)
This example demonstrates reading data from a Snowflake table.
% Declare the input concept 'transactions_snowflake' to read data from the 'transactions' table in Snowflake
@input("transactions_snowflake").
% Bind the 'transactions_snowflake' concept to the Snowflake database using the JDBC connection details
@bind("transactions_snowflake", "snowflake url='A778xxx-IVxxxx.snowflakecomputing.com', username='PROMETHEUX', password='myPassword', warehouse='COMPUTE_WH'",
"TEST", "transaction_data").
% Define a rule to extract TransactionId, CustomerId, and Amount from the 'transactions' table in Snowflake
transactions_snowflake_test(TransactionId, CustomerId, Amount) :-
transactions_snowflake(TransactionId, CustomerId, Amount).
% Declare the output concept 'transactions_snowflake_test' for making the processed data available
@output("transactions_snowflake_test").
Databricks
Databricks is a cloud-based platform for data engineering and data science.
This example demonstrates writing data to a Databricks table.
% Declare the input concept 'sales_postgres' to read data from the 'sales' table in Postgres
@input("sales_postgres").
% Bind the 'sales_postgres' concept to the Postgres database using the JDBC connection details
@qbind("sales_postgres","postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw'",
"postgres", "select sale_id, product_id, sale_amount from sales").
% Define a rule to extract SaleId, ProductId, and SaleAmount from the 'sales' table in Postgres
sales_databricks(SaleId, ProductId, SaleAmount) :-
sales_postgres(SaleId, ProductId, SaleAmount).
% Declare the output concept 'sales_databricks' to write data to the Databricks table
@output("sales_databricks").
@model("sales_databricks", "['sale_id:int', 'productId:int', 'sale_amount:int']").
% Bind the 'sales_databricks' concept to the Databricks cluster using the JDBC connection details
@bind("sales_databricks","databricks batchSize=5, password='dapixxxx', host='dbc-xxxx-02fe.cloud.databricks.com'",
"/sql/1.0/warehouses/3283xxxx", "sales").
This example demonstrates reading data from a Databricks table.
% Declare the input concept 'sales_databricks' to read data from the 'sales' table in Databricks
@input("sales_databricks").
% Bind the 'sales_databricks' concept to the Databricks cluster using the JDBC connection details
@qbind("sales_databricks","databricks fetchSize=5, password='dapixxxx', host='dbc-xxxx-02fe.cloud.databricks.com'",
"/sql/1.0/warehouses/3283xxxx", "select sale_id, productId from sales").
% Define a rule to extract ProductId from the 'sales' table in Databricks
sales(Product) :-
sales_databricks(Sale, Product).
% Declare the output concept 'sales' to return the processed data
@output("sales").