Connecting to Databases and External Data Sources

Prometheux can seamlessly integrate and migrate data across various platforms. It supports file-based sources (CSV, Parquet, Excel, JSON, COBOL), relational and graph databases, cloud data warehouses, NoSQL stores, REST APIs, and distributed file systems like S3 and HDFS.

@bind Options

The bind command configures reading from and writing to databases and data sources:
@bind("concept_name",
      "datasource_type option_1='value_1', option_2='value_2', …",
      "database_name",
      "table_name").
Critical syntax requirements:
  1. Options must be comma-separated: option1='value1', option2='value2'
  2. Values must be quoted: host='localhost' not host=localhost
  3. Use username= not user= for database authentication
  4. Must end with a dot: @bind(...).
Correct:
@bind("employees", "postgresql host='localhost', port=5432, username='postgres', password='mypass'", "company_db", "employees").
Supported datasource_type values:
TypeDescription
csvCSV files
parquetParquet files
excelExcel files
jsonJSON files
cobolLegacy COBOL / EBCDIC files (with copybook)
postgresqlPostgreSQL databases
neo4jNeo4j graph databases
db2DB2 databases
mariadbMariaDB databases
oracleOracle databases
sqliteSQLite databases
mysqlMySQL databases
sqlserverSQL Server databases
h2H2 databases
sybaseSybase databases
teradataTeradata databases
redshiftAmazon Redshift
bigqueryGoogle BigQuery
hiveApache Hive
prestoPresto
snowflakeSnowflake
databricksDatabricks
dynamodbAmazon DynamoDB
apiREST APIs
textPlain text files
binaryBinary files (PDF, images, etc.)
Common connection options:
  • url: Full JDBC URL (e.g. jdbc:postgresql://localhost:5432/prometheux)
  • host: Database host
  • port: Database port
  • database: Database name
  • username: Login username
  • password: Login password

Configuring Credentials

Credentials can be specified directly in @bind annotations or stored in px.properties for centralized management:
@bind("concept_name",
      "datasource_type username='user', password='pass'",
      "database_name",
      "table_name").
Credentials can also be managed via REST API endpoints.

CSV Datasource

Prometheux supports CSV files for both reading and writing. By default, all fields are treated as strings. Values \N are treated as null (labelled nulls).

@bind options

@bind("relation",
      "csv option_1='value_1', option_2='value_2', …",
      "filepath",
      "filename").
Available options:
  • useHeaders: true or false — whether headers are present
  • delimiter: Field separator character
  • recordSeparator: Record separator
  • quoteMode: all, minimal, non_numeric, or none
  • nullString: String to use for null values
  • multiline: true to handle multi-line fields
  • coalesce: true to produce a single output file (standalone environments only)

Examples

Simple read:
@bind("myCsv", "csv", "/path_to_csv/folder", "csv_name.csv").
myAtom(X,Y,Z) <- myCsv(X,Y,Z).
@output("myAtom").
With column mapping:
@bind("myCsv", "csv useHeaders=true", "/path_to_csv/folder", "csv_name.csv").
@mapping("myCsv", 0, "var1", "string").
@mapping("myCsv", 1, "var2", "string").
@mapping("myCsv", 2, "var3", "string").

myAtom(X,Y,Z) <- myCsv(X,Y,Z).
@output("myAtom").
Selecting columns and filtering with SQL:
@bind("users", "csv useHeaders=true", "/path_to_csv/folder", "users.csv").

filtered_users() <- SELECT Name, Surname, Age FROM users WHERE Age > 10.
@output("filtered_users").
Writing filtered results to a new CSV:
@bind("users", "csv useHeaders=true", "/path_to_csv/folder", "users.csv").

young_users() <- SELECT Name, Surname, Age FROM users WHERE Age < 25.

@output("young_users").
@bind("young_users", "csv", "/another_csv_path/folder/output", "young_users").

Parquet Datasource

Parquet is a columnar storage format optimized for large-scale data lake scenarios.
@bind("shipping_parquet", "parquet", "disk/data/input_files", "shipping.parquet").

shipping_parquet_test(OrderId, ShippingDate) <- shipping_parquet(OrderId, ShippingDate).

@output("shipping_parquet_test").

Excel Datasource

Write from CSV to Excel:
@bind("shipping_excel_csv", "csv useHeaders=true", "disk/data/generated_data", "shipping_data_excel.csv").

@model("shipping_excel", "['OrderId:int','ShippingDate:date']").

shipping_excel(OrderId, ShippingDate) <- shipping_excel_csv(OrderId, ShippingDate).

@bind("shipping_excel", "excel useHeaders=true", "disk/data/input_files", "shipping.xls").
@output("shipping_excel").
Read from Excel:
@bind("shipping_excel", "excel useHeaders=true", "disk/data/input_files", "shipping.xls").

shipping_excel_test(OrderId, ShippingDate) <- shipping_excel(OrderId, ShippingDate).
@output("shipping_excel_test").
Read from a specific sheet:
@bind("shipping_excel_sheet", "excel useHeaders=true, dataAddress=''Sheet1'!A1'", "disk/data/input_files", "shipping.xls").

shipping_excel_sheet_test(OrderId, ShippingDate) <- shipping_excel_sheet(OrderId, ShippingDate).
@output("shipping_excel_sheet_test").

JSON Datasource

Prometheux reads JSON files and automatically infers nested objects as struct types. Nested fields are accessible via dot notation in SQL or via struct:get in Vadalog rules.

Simple Read

@bind("orders", "json", "data/orders", "orders.json").

all_orders(OrderId, CustomerName, Total) <- orders(OrderId, _, CustomerName, _, _, Total, _).
@output("all_orders").

SQL Queries on Nested Fields

@bind("orders", "json", "data/orders", "orders.json").

customer_orders() <- SELECT order_id, 
                            customer.name AS customer_name, 
                            customer.email AS email, 
                            total_amount 
                     FROM orders 
                     WHERE status = 'shipped'.

@output("customer_orders").
Aggregation:
@bind("orders", "json", "data/orders", "orders.json").

orders_by_city() <- SELECT shipping_address.city AS city, 
                           COUNT(*) AS order_count,
                           SUM(total_amount) AS total_revenue
                    FROM orders 
                    GROUP BY shipping_address.city.

@output("orders_by_city").
Parameterized query:
@param("min_amount", "500").
@param("target_state", "CA").

@bind("orders", "json", "data/orders", "orders.json").

filtered_orders() <- SELECT order_id, customer.name AS customer_name, total_amount 
                     FROM orders 
                     WHERE total_amount > ${min_amount}
                       AND shipping_address.state = '${target_state}'.

@output("filtered_orders").

Using struct:get

@bind("orders", "json", "data/orders", "orders.json").

order_customers(OrderId, CustomerName, Email) <- 
    orders(OrderId, _, Customer, _, _, _, _), 
    CustomerName = struct:get("name", Customer), 
    Email = struct:get("email", Customer).

@output("order_customers").

Query Option in @bind

@bind("high_value_orders", 
      "json query='SELECT order_id, customer.name AS customer_name, total_amount FROM high_value_orders WHERE total_amount > 1000'", 
      "data/orders", 
      "orders.json").

result(OrderId, CustomerName, Amount) <- high_value_orders(OrderId, CustomerName, Amount).
@output("result").
Root-level arrays vs nested arrays: When a JSON file contains a root-level array (e.g., [{...}, {...}]), each element becomes a separate row automatically. When the array is nested inside an object (e.g., {"items": [{...}]}), use collections:explode to convert array elements into rows.

COBOL Datasource

Prometheux can read legacy COBOL / EBCDIC data files — VSAM dumps, flat record files, variable-length RDW/BDW streams — and join them with other sources using standard Vadalog rules. The connector is powered by AbsaOSS Cobrix. A COBOL binding always needs:
  1. A data file (addressed by filepath + filename)
  2. A copybook describing the record layout — passed via the copybook option as a path to a .cpy file
The @bind tokenizer strips \n and \t from the option block and does not support escaped single quotes, so multi-line copybook bodies cannot be embedded inline. Keep the copybook as a file and reference it by path.

@bind options

@bind("relation",
      "cobol option_1='value_1', option_2='value_2', …",
      "filepath",
      "filename").
  • copybook: required — path to the COBOL copybook file
  • cobolPreset: shortcut for common mainframe framings:
    • flat-fixed-ebcdic (default) — fixed-length records, EBCDIC encoding
    • flat-fixed-ascii — fixed-length records, ASCII encoding
    • mainframe-v — IBM variable-length records with 4-byte RDW header
    • mainframe-vb — IBM variable-blocked: BDW-prefixed blocks of RDW-prefixed records
    • custom — no preset; supply all Cobrix flags via cobrix.* options
  • encoding: Override encoding inferred by preset (ebcdic, ascii)
  • record_format: F (fixed), V (variable), VB (variable-blocked)
  • cobolFlattenPolicy:
    • dotted (default) — flatten nested groups into prefixed top-level columns
    • keepNested — preserve nested structs as map values (accessible via struct:get)
  • cobolFileExtensions: Filter files in a directory (e.g., .dat,.bin)
  • cobrix.<flag>: Passthrough to the Cobrix reader verbatim

Example: Fixed-Length EBCDIC File

@bind("customer_master",
      "cobol copybook='/data/cobol/customer_master.cpy', cobolPreset='flat-fixed-ebcdic'",
      "/data/cobol",
      "customer_master.dat").

italian_customers(Id, Name, Status, City, Balance) <-
    SELECT CUST_REC_CUST_ID          AS id,
           CUST_REC_CUST_NAME        AS name,
           CUST_REC_CUST_STATUS      AS status,
           CUST_REC_CUST_ADDRESS_CUST_CITY AS city,
           CUST_REC_CUST_BALANCE     AS balance
    FROM customer_master
    WHERE CUST_REC_CUST_ADDRESS_CUST_COUNTRY = 'ITA'
      AND CUST_REC_CUST_STATUS = 'A'.

@output("italian_customers").

Example: Variable-Length Mainframe File

@bind("transactions",
      "cobol copybook='/data/cobol/transactions.cpy', cobolPreset='mainframe-v'",
      "/data/cobol",
      "transactions.bin").

high_value_tx(Id, Amount) <-
    transactions(Id, _, _, Amount, _),
    Amount > 10000.

@output("high_value_tx").

Example: Advanced Cobrix Overrides

@bind("legacy_stream",
      "cobol copybook='/data/cobol/legacy.cpy',
             cobolPreset='custom',
             encoding='ebcdic',
             record_format='VB',
             cobrix.is_rdw_big_endian='true',
             cobrix.rdw_adjustment='-4',
             cobrix.bdw_adjustment='-4',
             cobrix.is_rdw_part_of_record_length='false'",
      "s3a://my-legacy-bucket/cobol",
      "legacy.bin").

result(Id, Code) <- legacy_stream(Id, Code, _).
@output("result").

Keeping Nested Groups as Structs

@bind("customer_master",
      "cobol copybook='/data/cobol/customer_master.cpy',
             cobolPreset='flat-fixed-ebcdic',
             cobolFlattenPolicy='keepNested'",
      "/data/cobol",
      "customer_master.dat").

customer_city(Id, City) <-
    customer_master(Id, _, _, Address, _, _, _, _),
    City = struct:get("CUST_CITY", Address).

@output("customer_city").

Querying COBOL Data with SQL

@bind("customer_master",
      "cobol copybook='/data/cobol/customer_master.cpy', cobolPreset='flat-fixed-ebcdic'",
      "/data/cobol",
      "customer_master.dat").

customer_preview() <- SELECT * FROM customer_master LIMIT 10 OFFSET 0.
customer_count(Total) <- SELECT COUNT(*) AS total FROM customer_master.

@output("customer_preview").
@output("customer_count").
For a complete COBOL example with cross-source JOINs and compliance rules, see From Mainframe to Modern: Migrating Card Clearing.
Tips:
  • PIC S9(n)V99 COMP-3 (packed decimal) is exposed as double in Vadalog
  • PIC 9(n) COMP binary counters are exposed as int or long
  • OCCURS n TIMES groups become Vadalog list columns, expandable with collections:explode
  • Point filepath to a directory to read all matching extracts; use cobolFileExtensions to filter

PostgreSQL Database

Write data from CSV to PostgreSQL:
@bind("customer_postgres_csv", "csv useHeaders=true", "disk/data/generated_data", "customer_postgres.csv").

customer_postgres(CustomerID, Name, Surname, Email) <- 
        customer_postgres_csv(CustomerID, Name, Surname, Email).

@model("customer_postgres", "['customer_id:int', 'name:string', 'surname:string', 'email:string']").
@output("customer_postgres").
@bind("customer_postgres", "postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw'", 
      "prometheux", "customer").
Read a full table:
@bind("customer_postgres", "postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw'", 
      "prometheux", "customer").

customer_postgres_test(CustomerID, Name, Surname, Email) <- 
        customer_postgres(CustomerID, Name, Surname, Email).

@output("customer_postgres_test").
Read with a SQL query:
@qbind("customer_postgres", "postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw', database='prometheux'", 
      "", "select CustomerID, Email from customer where CustomerID > 0").

customer_postgres_test(CustomerID, Email) <- 
        customer_postgres(CustomerID, Email), OnlyPx = ends_with(Email, "prometheux.ai"), OnlyPx = #T.

@output("customer_postgres_test").

PostgreSQL with Supabase

Connect using the JDBC Transaction Pooler (recommended):
@bind("owns", "postgresql url='jdbc:postgresql://aws-1-eu-west-1.pooler.supabase.com:6543/postgres?user=postgres.[YOUR_PROJECT_ID]&password=[YOUR_PASSWORD]'",
      "postgres", "owns").

out(X, Y, Z) <- owns(X, Y, Z).
@output("out").
Or with individual parameters:
@bind("owns", "postgresql host='aws-1-eu-west-1.pooler.supabase.com', port='6543', username='postgres.[YOUR_PROJECT_ID]', password='[YOUR_PASSWORD]'",
      "postgres", "owns").

out(X, Y, Z) <- owns(X, Y, Z).
@output("out").
Supabase connection modes:
  • Transaction Pooler (port 6543): Recommended for serverless and short-lived connections
  • Session Pooler (port 5432): For long-lived connections needing session features
  • Direct Connection (port 5432): Direct connection without pooling
For enhanced security, create a dedicated read-only user in the Supabase SQL Editor:
CREATE USER vadalog_reader WITH PASSWORD '[YOUR_SECURE_PASSWORD]';
GRANT CONNECT ON DATABASE postgres TO vadalog_reader;
GRANT USAGE ON SCHEMA public TO vadalog_reader;
GRANT SELECT ON public.ownerships TO vadalog_reader;
After creating the user and granting permissions, it may take a few minutes for the changes to propagate. If you encounter connection issues, wait 5 minutes and try again.

MariaDB Database

@bind("order_mariadb", "mariadb host='mariadb-host', port=3306, username='prometheux', password='myPassw'", 
      "prometheux", "order_customer").

order_mariadb_test(OrderId, CustomerId, Cost) <- 
        order_mariadb(OrderId, CustomerId, Cost).

@output("order_mariadb_test").

Neo4j Database

Write nodes and relationships from CSV:
@bind("persons_order_neo4j_csv", "csv useHeaders=true", "disk/data/generated_data", "persons_order_neo4j.csv").

person_neo4j(CustomerId, Name, Surname, Email) <- 
        persons_order_neo4j_csv(CustomerId, Name, Surname, Email, OrderId, Cost).

@output("person_neo4j").
@bind("person_neo4j", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Person)").
@model("person_neo4j", "['customerId(ID):int', 'name:string', 'surname:string', 'email:string']").

order(OrderId, Cost) <- 
        persons_order_neo4j_csv(CustomerId, Name, Surname, Email, OrderId, Cost).

@output("order").
@bind("order", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Order)").
@mapping("order", 0, "orderId(ID)", "int").
@mapping("order", 1, "cost", "string").

order_person_rel_neo4j(OrderId, CustomerId) <- 
        persons_order_neo4j_csv(CustomerId, Name, Surname, Email, OrderId, Cost).

@output("order_person_rel_neo4j").
@bind("order_person_rel_neo4j", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Order)-[IS_RELATED_TO]->(:Person)").
@mapping("order_person_rel_neo4j", 0, "orderId:orderId(sID)", "int").
@mapping("order_person_rel_neo4j", 1, "customerId:customerId(tID)", "int").
Query with Cypher:
@qbind("persons_order_neo4j", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "", 
       "MATCH (o:Order)-[r:IS_RELATED_TO]->(p:Person) RETURN o.orderId, p.customerId").

persons_order_neo4j_test(CustomerId, OrderId) <- 
        persons_order_neo4j(CustomerId, OrderId).

@output("persons_order_neo4j_test").

Querying Neo4j with SQL

Prometheux automatically translates SQL to optimized Cypher, pushing it down to the Neo4j server:
@bind("person_db", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Person)").

% Preview with LIMIT
person_preview() <- SELECT * FROM person_db LIMIT 10.

% Count nodes
person_count(Total) <- SELECT COUNT(*) AS total FROM person_db.

@output("person_preview").
@output("person_count").
Relationships also support SQL:
@bind("friend_of_db", "neo4j username='neo4j', password='myPassw', host='neo4j-host', port=7680", "neo4j", "(:Person)-[:FRIEND_OF]->(:Person)").

friendship_count(Total) <- SELECT COUNT(*) AS total FROM friend_of_db.
@output("friendship_count").
SQL-to-Neo4j translation: SELECT * with LIMIT/OFFSET generates Cypher with SKIP/LIMIT; COUNT(*) generates a Cypher count query. No data is loaded into memory unnecessarily.

Amazon DynamoDB

@bind Options

  • region: AWS region (e.g., us-east-1)
  • username: AWS Access Key ID
  • password: AWS Secret Access Key
  • sessionToken: Session Token for temporary credentials
  • endpointOverride: Custom endpoint URL (for DynamoDB Local)
  • partitionKey: Partition key attribute name for table creation
  • sortKey: Sort key attribute name (optional)
  • billingMode: PAY_PER_REQUEST (default) or PROVISIONED
  • readCapacity / writeCapacity: Capacity units for provisioned mode
  • writeBatchSize: Items per batch (1–25, default 25)
  • readPageSize: Page size for read operations (default 100)
  • totalSegments: Segments for parallel scanning (default 8)
Write to DynamoDB:
@bind("users_csv", "csv useHeaders=true", "disk/data/input", "users.csv").

@model("users_dynamodb", "['id:string', 'name:string', 'email:string', 'age:int']").

users_dynamodb(Id, Name, Email, Age) <- 
        users_csv(Id, Name, Email, Age).

@output("users_dynamodb").
@bind("users_dynamodb", 
      "dynamodb username='AKIAIOSFODNN7EXAMPLE', password='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', partitionKey='id', billingMode='PAY_PER_REQUEST'", 
      "us-east-1", "users").
Read from DynamoDB:
@bind("users_dynamodb", 
      "dynamodb region='us-east-1', username='AKIAIOSFODNN7EXAMPLE', password='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'", 
      "us-east-2", "users").

young_users(Id, Name, Email) <- 
        users_dynamodb(Id, Name, Email, Age), Age < 30.

@output("young_users").
PartiQL query:
@qbind("user_orders", 
       "dynamodb username='AKIAIOSFODNN7EXAMPLE', password='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'", 
       "us-east-1", 
       "SELECT user_id, order_date, total_amount FROM orders WHERE user_id = 'u123' AND begins_with(order_date, '2025')").

recent_orders(UserId, OrderDate, Amount) <- 
        user_orders(UserId, OrderDate, Amount).

@output("recent_orders").
DynamoDB Local for development:
@bind("test_data", 
      "dynamodb region='us-east-1', username='test', password='test', endpointOverride='http://localhost:8000', partitionKey='id', billingMode='PAY_PER_REQUEST'", 
      "testdb", "test_table").

S3 Storage

Write to S3:
@bind("user_csv", "csv useHeaders=true", "disk/datasources", "users.csv").

user(X) <- user_csv(X).

@output("user").
@bind("user", "csv", "s3a://your-s3-bucket/", "user.csv").
Read from S3:
@bind("user_s3", "csv", "s3a://your-s3-bucket/", "user.csv").

user(X) <- user_s3(X).
@output("user").

Consuming Data via REST API

The API datasource supports JSON (default), CSV, and XML response formats, with full SQL integration for querying nested structures.

Configuration Options

  • responseFormat: json (default), csv, or xml
  • authType: basic, bearer, or apikey
  • username / password: For basic authentication
  • token: For bearer authentication
  • apikey: For API key authentication
  • headers: Custom headers as key1:value1,key2:value2
  • delimiter: CSV delimiter (default ,)

Quick Start

% Bitcoin price (no auth required)
@bind("btc", "api", "https://api.coingecko.com/api/v3/", "coins/bitcoin").
result() <- SELECT id, symbol, market_data.current_price.usd AS price FROM btc.
@output("result").
% REST Countries API
@bind("countries", "api", "https://restcountries.com/v3.1/", "all?fields=name,region,population").

regional_stats() <- SELECT region,
                           COUNT(*) AS country_count,
                           SUM(population) AS total_population
                    FROM countries
                    GROUP BY region
                    ORDER BY total_population DESC.

@output("regional_stats").

Querying Nested JSON

@bind("github_issues", 
      "api authType=bearer, token=${GITHUB_TOKEN}", 
      "https://api.github.com/repos/", "owner/repo/issues").

open_bugs() <- SELECT title, 
                      user.login AS author, 
                      labels[0].name AS primary_label,
                      created_at
               FROM github_issues 
               WHERE state = 'open' 
                 AND labels[0].name = 'bug'
               ORDER BY created_at DESC.

@output("open_bugs").

Joining Multiple API Sources

When joining multiple API sources, use renamed column format (predicate_name_i):
@bind("users_api", "api", "https://jsonplaceholder.typicode.com/", "users").
@bind("posts_api", "api", "https://jsonplaceholder.typicode.com/", "posts").

user_posts() <- SELECT u.users_api_4 AS author_name, 
                       u.users_api_2 AS email,
                       p.posts_api_2 AS post_title
                FROM users_api u 
                JOIN posts_api p ON u.users_api_3 = p.posts_api_3
                WHERE u.users_api_3 <= 5
                LIMIT 20.

@output("user_posts").

Prometheus Metrics

@bind("up_metric", "api", "http://prometheus-server:9090/api/v1/", "query?query=up").

result_linear(Result) <-
    up_metric(Data, Status),
    Status = "success",
    Results = struct:get("result", Data),
    Result = collections:explode(Results).

result(Job, Instance, Timestamp, Value) <-
    result_linear(Result),
    Metric = struct:get("metric", Result),
    Job = struct:get("job", Metric),
    Instance = struct:get("instance", Metric),
    ValueArray = struct:get("value", Result),
    Timestamp = collections:get(ValueArray, 0),
    Value = collections:get(ValueArray, 1).

@output("result").

CSV Format API

@bind("meteo",
      "api delimiter=';', username='my_username', password='my_password', responseFormat=csv",
      "https://api.meteomatics.com/",
      "2025-06-09T00:00:00Z--2025-06-12T00:00:00Z:PT3H/t_2m:C,relative_humidity_2m:p/47.423336,9.377225/csv").

hot_days() <- SELECT Valid_Date, T_2m_C 
              FROM meteo 
              WHERE T_2m_C > 25
              ORDER BY T_2m_C DESC.

@output("hot_days").

Authentication Summary

Auth TypeParametersExample
Bearer TokenauthType=bearer, token=<token>GitHub, Kubernetes, Prometheus
Basic AuthauthType=basic, username=<user>, password=<pass>Private APIs
API KeyauthType=apikey, apikey=<key>REST APIs
Custom Headersheaders=<key1>:<value1>,<key2>:<value2>APIs with custom headers
No Auth(none)Public APIs

Working with Arrays

When to use collections:explode:
  • Root-level array response [{item1}, {item2}] — each element is a row automatically, no explode needed
  • Nested array field {"leagues": [{item1}, {item2}]} — 1 row with the whole array; use collections:explode
@bind("leagues_api", "api", "https://www.thesportsdb.com/api/v1/json/3/", "all_leagues.php").

leagues_linear(League) <- leagues_api(LeaguesArray), 
                          League = collections:explode(LeaguesArray).

result(LeagueId, LeagueName) <- leagues_linear(League), 
                                LeagueId = struct:get("idLeague", League), 
                                LeagueName = struct:get("strLeague", League).

@output("result").

Best Practices

  1. JSON is the default responseFormat — omit it for JSON APIs
  2. Store tokens in environment variables: token=${API_TOKEN}
  3. Use SQL in rule bodies for filtering and aggregations
  4. Access nested fields with dot notation: market_data.current_price.usd
  5. Use SIZE(array_field) for reliable array length checks
  6. For JOINs between multiple sources, use renamed column references (predicate_name_i)
  7. Be mindful of rate limits on free-tier APIs

Text Files

@model("location","['Name:string', 'Description:string']").

@bind("location","text","/path/to/file","hansel_gretel_excerpt.txt").

location_head(Name,Description) <- location(Name,Description).
@output("location_head").

Binary Files

Read from a PDF:
@bind("person","binaryfile","/path/to/file","hansel_gretel_excerpt.pdf").

person_head(Name,Role) <- person(Name,Role).
@output("person_head").
Structured business documents (invoices, ID documents, receipts, tax forms, mortgage documents) are also supported via documentType:
@bind("iNV_pdf","binaryfile documentType='invoice'","/path/to/invoice","INV.pdf").
Supported document types include: invoice, receipt.retailMeal, idDocument.passport, tax.us.w2, mortgage.us.1003, contract, and many more.

HDFS

@bind("user_csv", "csv useHeaders=true", "hdfs://hdfs-host:9000/user", "users.csv").

user(X) <- user_csv(X).
@output("user").

Other Database Connectors

Sybase

@bind("order_sybase", "sybase host='sybase-host', port=5000, username='myUser', password='myPassw'", 
      "myDatabase", "orders").

order_sybase_test(OrderId, CustomerId, Amount) <- 
        order_sybase(OrderId, CustomerId, Amount).

@output("order_sybase_test").

Teradata

@bind("sales_teradata", "teradata host='teradata-host', port=1025, username='myUser', password='myPassw'", 
      "myDatabase", "sales").

sales_teradata_test(SaleId, ProductId, SaleAmount) <- 
        sales_teradata(SaleId, ProductId, SaleAmount).

@output("sales_teradata_test").

Amazon Redshift

@bind("analytics_redshift", "redshift host='redshift-cluster.amazonaws.com', port=5439, username='myUser', password='myPassword'", 
      "analyticsDB", "analytics").

analytics_redshift_test(AnalysisId, Metric, Value) <- 
        analytics_redshift(AnalysisId, Metric, Value).

@output("analytics_redshift_test").

Google BigQuery

Obtain credentials from the Google Cloud Console, then use either a credentials file or an access token:
% With credentials file
@bind("table",
      "bigquery credentialsFile=/path/to/gcp-credentials.json",
      "project-example-358816",
      "bigquery-public-data.thelook_ecommerce.order_items").

bigquery_table(X,Y,Z) <- table(X,Y,Z).
@post("bigquery_table","limit(10)").
@output("bigquery_table").
% With access token
@bind("table",
      "bigquery authMode=gcpAccessToken, gcpAccessToken='my-gcp-access-token'",
      "project-example-358816",
      "bigquery-public-data.thelook_ecommerce.order_items").

bigquery_table(X,Y,Z) <- table(X,Y,Z).
@output("bigquery_table").
% Via query
@qbind("query",
       "bigquery credentialsFile=src/test/resources/bigquery-credentials.json",
       "quantum-feat-358816",
       "SELECT product_id, SUM(sale_price) AS revenue FROM `bigquery-public-data.thelook_ecommerce.order_items` 
        WHERE sale_price > 100 GROUP BY product_id ORDER BY revenue DESC LIMIT 10").

bigquery_query(X,Y) <- query(X,Y).
@output("bigquery_query").

Snowflake

Retrieve your connection info from the Snowflake UI: user icon → “Connect a tool to Snowflake” → Connectors/Drivers → JDBC.
% With password
@bind("transactions_snowflake", "snowflake url='A778xxx-IVxxxx.snowflakecomputing.com', username='PROMETHEUX', password='myPassword', warehouse='COMPUTE_WH'", 
      "TEST", "transaction_data").

transactions_snowflake_test(TransactionId, CustomerId, Amount) <- 
        transactions_snowflake(TransactionId, CustomerId, Amount).

@output("transactions_snowflake_test").
Snowflake also supports Programmatic Access Tokens (PAT) as an alternative to password authentication — use the same syntax and provide the PAT value as password.

Databricks

Write to Databricks:
@qbind("sales_postgres","postgresql host='postgres-host', port=5432, username='prometheux', password='myPassw'",
      "postgres", "select sale_id, product_id, sale_amount from sales").

sales_databricks(SaleId, ProductId, SaleAmount) <- 
        sales_postgres(SaleId, ProductId, SaleAmount).

@output("sales_databricks").
@model("sales_databricks", "['sale_id:int', 'productId:int', 'sale_amount:int']").
@bind("sales_databricks","databricks batchSize=5, OAuth2ClientId='your_client_id', OAuth2Secret='dosexxxx', host='dbc-xxxx-02fe.cloud.databricks.com'",
      "/sql/1.0/warehouses/3283xxxx", "sales").
Read from Databricks:
@qbind("sales_databricks","databricks fetchSize=5, authMode='PAT', token='dapixxxx', host='dbc-xxxx-02fe.cloud.databricks.com'",
      "/sql/1.0/warehouses/3283xxxx", "select sale_id, productId from sales").

sales(Product) <- 
        sales_databricks(Sale, Product).

@output("sales").