Transformations
When you create a connection to a Macrometa collection, you can add a transformation. These transformations are similar to Macrometa stream workers, which help you change your data using Stream QL in stream worker queries.
Stream QL is documented in Stream Worker Queries, but only portions of that section apply to transformations.
This page explains the difference between transformations and stream workers and lists the portions of Stream QL (the language used in stream worker queries) you can and cannot use in transformations.
Transformations vs. Stream Workers
You can think of transformations as lightweight stream workers. You can use either one depending on your data pipeline and processing needs.
Input and Output
Transformations have input and output defined automatically, and you cannot change them.
You can define multiple inputs and outputs for stream workers, but you must set them up yourself.
Stream workers support multiple output actions, including INSERT INTO, UPDATE, and DELETE. Transformations only support INSERT INTO, which allows you to insert transformed data into the output target.
Queries
Transformations allow you to write one query, while stream workers allow you to write several queries. For more information about stream worker queries, refer to Stream Worker Queries.
Workflow
Transformations only process, or transform, content going into or out of Macrometa collections via a connection.
Stream workers process data streams, which can include Macrometa collections, but also support a variety of different sources and sinks.
Supported Stream QL
Stream QL is documented in Stream Worker Queries, but only the following portions of that documentation apply to transformations. If your data pipeline needs are met by the following functionality, then a transformation might be the best solution:
- Query, except for the joins and stream worker examples.
- Values
- Event Types
- Functions
- Filters
- Aggregate Functions
- HAVING|WHERE
- GROUP BY
- ORDER BY
- LIMIT and OFFSET
- Output Rate Limiting
- In-line Windows, which are windows defined in the query.
Unsupported Stream QL
The following Stream QL and stream worker elements are not supported. If you need to use them in your data pipeline, then you might need to create a stream worker:
- Custom Script Functions
- JOIN
- Partitions
- Named Windows
- Named Aggregations
- Ad Hoc Queries
- Creating any other elements, including Tables, Sources, and SINKS.
Syntax
For more information about query syntax, refer to Query Syntax.
INSERT INTO
Output
SELECT
< expressions >
FROM
Input
WHERE
< condition > -- optional
GROUP BY
< fields >
HAVING
< condition > -- optional
Example 1
INSERT INTO Output
SELECT "Coinbase Pro" as exchange,
"USA" as quote_region,
"BTC/USD" as symbol,
avg(convert(price, 'double')) as ma,
convert(price, 'double') as close,
time:timestampInMilliseconds()/1000 as timestamp
FROM Input;
This transformation calculates the average price, converts the price to a double data type, and adjusts the timestamp for a specific input.
"Coinbase Pro" as exchange, "USA" as quote_region, "BTC/USD" as symbol
: These sections are hardcoding the values forexchange
,quote_region
, andsymbol
fields to "Coinbase Pro", "USA", and "BTC/USD", respectively.avg(convert(price, 'double')) as ma
: This takes the average of theprice
field, which is first converted to adouble
data type. The result will be aliased asma
.convert(price, 'double') as close
: Here, theprice
is simply converted to adouble
data type, and the result is aliased asclose
.time:timestampInMilliseconds()/1000 as timestamp
: The timestamp is converted from milliseconds to seconds and then renamed astimestamp
.
This transformation runs on all the documents from your Input
collection and inserts the transformed documents into the Output
collection.
Example 2
INSERT INTO Output
SELECT exchange,
quote_region,
symbol,
timestamp,
trade_location,
trade_price,
trade_strategy,
trade_type
FROM Input
WHERE
symbol == "MBDX" AND
trade_location == "CA";
This transformation query performs the following operations:
SELECT exchange, quote_region, symbol, timestamp, trade_location, trade_price, trade_strategy, trade_type
: This part of the query specifies the fields from theInput
collection that should be included in theOutput
collection. These are:exchange
,quote_region
,symbol
,timestamp
,trade_location
,trade_price
,trade_strategy
, andtrade_type
.WHERE symbol == "MBDX" AND trade_location = "CA"
: This condition filters the data from theInput
collection to only include documents where thesymbol
field is equal to "MBDX" and thetrade_location
field is equal to "CA".
As a result, the Output
collection will contain only the documents from the Input
collection that satisfy the specified condition, and each document in the Output
collection will include only the fields specified in the SELECT
clause.
Example 3 - Rollup
INSERT INTO Output
SELECT productId,
region,
sum(convert(amount, 'double')) as totalAmount
FROM Input
GROUP BY productId, region;
This transformation query, known as a "rollup," executes the following actions:
SELECT productId, region, sum(convert(amount, 'double')) as totalAmount
: This part of the query instructs the transformation to includeproductId
andregion
fields in theOutput
collection. Furthermore, it computes the sum of theamount
field, after converting it to adouble
data type, and labels the result astotalAmount
.GROUP BY productId, region
: This section groups the data from theInput
collection based on theproductId
andregion
fields. For each unique combination ofproductId
andregion
, it calculates the total salesamount
.
In summary, this transformation collects documents from the Input
collection, groups them by productId
and region
, and calculates the total amount
for each group. The transformed data, which includes productId
, region
, and totalAmount
for each group, is then inserted into the Output
collection. This allows for an efficient rollup of sales data by product and region.