From 801d38368c03af69b44df01b44189411625d2350 Mon Sep 17 00:00:00 2001 From: Chuck Larrieu Casias <87353672+chuck-alt-delete@users.noreply.github.com> Date: Wed, 22 Mar 2023 09:05:28 -0700 Subject: [PATCH] refactor faker parsing and enable array relationships (#85) * refactor faker parsing and enable array relationships * add helpful error message * update ecommerce example * slight change to array example * update ecommerce example * accommodate breaking change to pass tests * update readme * add warning about executing user input to readme * fix typo * beef up examples with blog example * bump version --- README.md | 22 ++- datagen.ts | 2 +- examples/README.md | 1 + examples/blog/README.md | 60 +++++++ examples/blog/blog.json | 61 +++++++ examples/ecommerce/README.md | 264 ++++++++++++++++++------------ examples/ecommerce/blog.json | 61 ------- examples/ecommerce/ecommerce.json | 44 ++--- package.json | 2 +- src/schemas/generateMegaRecord.ts | 86 +++++----- src/schemas/parseAvroSchema.ts | 30 ++-- src/schemas/parseSqlSchema.ts | 10 +- tests/array.json | 24 +++ tests/iterationIndex.json | 14 +- tests/products.sql | 8 +- tests/schema-nested.json | 32 ++-- tests/schema.json | 38 ++--- 17 files changed, 453 insertions(+), 306 deletions(-) create mode 100644 examples/blog/README.md create mode 100644 examples/blog/blog.json delete mode 100644 examples/ecommerce/blog.json create mode 100644 tests/array.json diff --git a/README.md b/README.md index b5be4b6..e914907 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,11 @@ See example input schema files in [examples](./examples) and [tests](/tests) fol 1. Iterate through a schema defined in SQL 10 times, but don't actually interact with Kafka or Schema Registry ("dry run"). Also, see extra output with debug mode. ```bash - datagen --schema tests/products.sql --format avro --dry-run --debug + datagen \ + --schema tests/products.sql \ + --format avro \ + --dry-run \ + --debug ``` 1. Same as above, but actually create the schema subjects and Kafka topics, and actually produce the data. There is less output because debug mode is off. @@ -146,7 +150,7 @@ This is particularly useful when you want to generate a small set of records wit "topic": "mz_datagen_users" }, "id": "iteration.index", - "name": "internet.userName", + "name": "faker.internet.userName()", } ] ``` @@ -181,13 +185,15 @@ docker run \ You can define input schemas using JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`). Within those schemas, you use the [FakerJS API](https://fakerjs.dev/api/) to define the data that is generated for each field. -You can pass arguments to `faker` methods by escaping quotes. For example, here is [datatype.number](https://fakerjs.dev/api/datatype.html#number) with `min` and `max` arguments: +You can pass arguments to `faker` methods by escaping quotes. For example, here is [faker.datatype.number](https://fakerjs.dev/api/datatype.html#number) with `min` and `max` arguments: ``` -"datatype.number({\"min\": 100, \"max\": 1000})" +"faker.datatype.number({min: 100, max: 1000})" ``` > :construction: Right now, JSON is the only kind of input schema that supports generating relational data. + +> :warning: Please inspect your input schema file since `faker` methods can contain arbitrary Javascript functions that `datagen` will execute. ### JSON Schema Here is the general syntax for a JSON input schema: @@ -229,10 +235,10 @@ The SQL schema option allows you to use a `CREATE TABLE` statement to define wha ```sql CREATE TABLE "ecommerce"."products" ( "id" int PRIMARY KEY, - "name" varchar COMMENT 'internet.userName', - "merchant_id" int NOT NULL COMMENT 'datatype.number', - "price" int COMMENT 'datatype.number', - "status" int COMMENT 'datatype.boolean', + "name" varchar COMMENT 'faker.internet.userName()', + "merchant_id" int NOT NULL COMMENT 'faker.datatype.number()', + "price" int COMMENT 'faker.datatype.number()', + "status" int COMMENT 'faker.datatype.boolean()', "created_at" datetime DEFAULT (now()) ); ``` diff --git a/datagen.ts b/datagen.ts index d1395ed..6254ad3 100755 --- a/datagen.ts +++ b/datagen.ts @@ -17,7 +17,7 @@ import dataGenerator from './src/dataGenerator.js'; import fs from 'fs'; import { program, Option } from 'commander'; -program.name('datagen').description('Fake Data Generator').version('0.1.4'); +program.name('datagen').description('Fake Data Generator').version('0.2.0'); program .requiredOption('-s, --schema ', 'Schema file to use') diff --git a/examples/README.md b/examples/README.md index 957ecda..287b9cb 100644 --- a/examples/README.md +++ b/examples/README.md @@ -6,5 +6,6 @@ This directory contains end-to-end tutorials for the `datagen` tool. | -------- | ----------- | | [ecommerce](ecommerce) | A tutorial for the `datagen` tool that generates data for an ecommerce website. | | [docker-compose](docker-compose) | A `docker-compose` setup for the `datagen`. | +| [blog](blog) | Sample data for a blog with users, posts, and comments. | To request a new tutorial, please [open an issue](https://github.com/MaterializeInc/datagen/issues/new?assignees=&labels=feature%2C+enhancement&template=feature_request.md&title=Feature%3A+). diff --git a/examples/blog/README.md b/examples/blog/README.md new file mode 100644 index 0000000..5489243 --- /dev/null +++ b/examples/blog/README.md @@ -0,0 +1,60 @@ +# Blog Demo + +This small example generates relational data for a blog where users make posts, and posts have comments by other users. + +## Inspect the Schema + +1. Take a moment to look at [blog.json](./blog.json) and make a prediction about what the output will look like. + +## Do a Dry Run + +Here is a command to do a dry run of a single iteration. + +``` +datagen \ + --dry-run \ + --debug \ + --schema examples/blog/blog.json \ + --format avro\ + --prefix mz_datagen_blog \ + --number 1 +``` + +Notice that in a single iteration, a user is created, and then 2 posts are created, and for each post, 2 comments are created. Then, since comments are made by users, 2 additional users are created. This happens in such a way that the value of a field in a parent record is passed to child records (eg if `users.id` is `5`, then each associated post will have `posts.user_id` equal to `5`). This makes it so downstream systems can perform meaningful joins. + +Also notice the number of unique primary keys of each collection are limited, so over time you will see each key appear multiple times. These can be interpreted in upstream systems as updates. + +## (Optional) Produce to Kafka + +See [.env.example](../../.env.example) to see the environment variables to connect to your Kafka cluster. +If you use the `--format avro` option, you would also have to set environment variables to connect to your Schema Registry. + +After you set those, you can produce to your Kafka cluster. Press `Ctrl+C` when you are ready to stop the producer. + +``` +datagen \ + --schema examples/blog/blog.json \ + --format avro \ + --prefix mz_datagen_blog \ + --number -1 +``` + +When you are finished, you can delete all the topics and schema subjects with the `--clean` option. + +``` +datagen \ + --schema examples/blog/blog.json \ + --format avro \ + --prefix mz_datagen_blog \ + --clean +``` + +## (Optional) Query in Materialize + +Materialize is a [streaming database](https://materialize.com/guides/streaming-database/). You create materialized views with standard SQL and Materialize will eagerly read from Kafka topics and Postgres tables and keep your materialized views up to date automatically in response to new data. It's Postgres wire compatible, so you can read your materialized views directly with the `psql` CLI or any Postgres client library. + +See the [ecommerce example](../ecommerce/README.md) for a full end-to-end example where data is transformed in and served from Materialize in near real-time. + +### Learn More + +Check out the Materialize [docs](www.materialize.com/docs) and [blog](www.materialize.com/blog) for more! \ No newline at end of file diff --git a/examples/blog/blog.json b/examples/blog/blog.json new file mode 100644 index 0000000..70a4133 --- /dev/null +++ b/examples/blog/blog.json @@ -0,0 +1,61 @@ +[ + { + "_meta": { + "topic": "users", + "key": "id", + "relationships": [ + { + "topic": "posts", + "parent_field": "id", + "child_field": "user_id", + "records_per": 2 + } + ] + }, + "id": "faker.datatype.number(100)", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "phone": "faker.phone.imei()", + "website": "faker.internet.domainName()", + "city": "faker.address.city()", + "company": "faker.company.name()" + }, + { + "_meta": { + "topic": "posts", + "key": "id", + "relationships": [ + { + "topic": "comments", + "parent_field": "id", + "child_field": "post_id", + "records_per": 2 + } + ] + }, + "id": "faker.datatype.number(1000)", + "user_id": "faker.datatype.number(100)", + "title": "faker.lorem.sentence()", + "body": "faker.lorem.paragraph()" + }, + { + "_meta": { + "topic": "comments", + "key": "id", + "relationships": [ + { + "topic": "users", + "parent_field": "user_id", + "child_field": "id", + "records_per": 1 + } + ] + }, + "id": "faker.datatype.number(2000)", + "user_id": "faker.datatype.number(100)", + "body": "faker.lorem.paragraph", + "post_id": "faker.datatype.number(1000)", + "views": "faker.datatype.number({min: 100, max: 1000})", + "status": "faker.datatype.number(1)" + } +] diff --git a/examples/ecommerce/README.md b/examples/ecommerce/README.md index 6f52a44..a35f410 100644 --- a/examples/ecommerce/README.md +++ b/examples/ecommerce/README.md @@ -10,51 +10,51 @@ Here is the input schema: [ { "_meta": { - "topic": "mz_datagen_ecommerce_users", + "topic": "users", "key": "id", - "relationships": [ - { - "topic": "mz_datagen_ecommerce_purchases", + "relationships": [ + { + "topic": "purchases", "parent_field": "id", "child_field": "user_id", "records_per": 4 } - ] + ] }, - "id": "datatype.number(100)", - "name": "internet.userName", - "email": "internet.exampleEmail", - "city": "address.city", - "state": "address.state", - "zipcode": "address.zipCode" + "id": "faker.datatype.number(1000)", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "city": "faker.address.city()", + "state": "faker.address.state()", + "zipcode": "faker.address.zipCode()" }, { "_meta": { - "topic": "mz_datagen_ecommerce_purchases", + "topic": "purchases", "key": "id", - "relationships": [ + "relationships": [ { - "topic": "mz_datagen_ecommerce_items", - "parent_field": "item_id", - "child_field": "id", - "records_per": 1 + "topic": "items", + "parent_field": "item_ids", + "child_field": "id" } - ] + ] }, - "id": "datatype.number(1000)", - "user_id": "datatype.number(100)", - "item_id": "datatype.number(5000)" + "id": "faker.datatype.uuid()", + "user_id": "this string can be anything since this field is determined by user.id", + "item_ids": "faker.helpers.uniqueArray((()=>{return Math.floor(Math.random()*5000);}), Math.floor(Math.random()*4+1))", + "total": "faker.commerce.price(25, 2500)" }, { "_meta": { - "topic": "mz_datagen_ecommerce_items", + "topic": "items", "key": "id" }, - "id": "datatype.number(5000)", - "name": "commerce.product", - "price": "commerce.price", - "description": "commerce.productDescription", - "material": "commerce.productMaterial" + "id": "this string can be anything since this field is determined by purchases.item_ids", + "name": "faker.commerce.product()", + "price": "faker.commerce.price(5, 500)", + "description": "faker.commerce.productDescription()", + "material": "faker.commerce.productMaterial()" } ] ``` @@ -62,18 +62,20 @@ Here is the input schema: On each iteration of `datagen`: 1. A user is created, then 1. Four purchases are created that are associated with that user, and then -1. An item is created for each purchase so that the purchase's `item_id` is equal to the `id` for each item associated with it. +1. Up to five items are created for each purchase so that each item ID in the purchase's `item_ids` array is used as an `id` in the `items` dataset. Here are a couple of important ideas to note: - The file is a list of datasets - Each dataset has a `_meta` object that specifies the key, Kafka topic, and a list of relationships - Each relationship specifies the topic, parent field, matching child field, and how many child records should be produced for each parent record + - If the parent field is an array, the size of the array determines how many child record are produced - Each dataset has several fields, where field name is mapped to a [FakerJS API](https://fakerjs.dev/api/) method. -- The primary keys happen to use `faker.datatype()` - - In effect, this limits the key space for these records. - - For example, user ID is specified with `datatype.number(100)`, which means there will be a maximum of 100 unique users, and if a new user is produced with the same ID, it will be interpreted as an update in the downstream database (more on Materialize's `UPSERT` envelope later). -- Since each purchase record has only one item, we will need multiple records with the same purchase ID in order to show all the different items. That means we will need to do some aggregations by purchase ID downstream (more on Materialize's `NONE` envelope, a.k.a. append-only sources later) -- Given the key spaces, it's possible for multiple users to generate different purchase records with the same purchase ID. We can interpret that to mean multiple users can participate in a purchase. +- The primary keys for `users` and `items` use `faker.datatype.number` + - In effect, this limits the key space for these datasets. + - For example, user ID is specified with `datatype.number(1000)`, which means there will be a maximum of 1000 unique users, and if a new user is produced with the same ID, it will be interpreted as an update in the downstream database (more on Materialize's `UPSERT` envelope later). +- Notice `purchases.item_ids` uses `faker.helpers.uniqueArray` and some Javascript functions to build an array. This is cool and fun, but just make sure you carefully inspect your input schema file since `datagen` will execute some Javascript + +> :warning: Repeat -- please inspect your input schema file since `faker` methods can contain arbitrary Javascript functions that `datagen` will execute. ## Set up infrastructure @@ -91,26 +93,30 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry 1. [Install datagen](../../README.md#installation) if you haven't already. 1. Create a `.env` file with your Kafka and Schema Registry credentials (see [.env.example](../../.env.example)). -1. Generate a single iteration of records with dry run and debug modes and check the output. +1. Generate a single iteration of records with dry run and debug modes and check the output. You will see the Avro schemas that will be registered with Schema Registry, along with a single iteration's worth of records. You will see one user created, 4 purchases, and up to 5 items. ```bash datagen \ - --schema ecommerce.json \ + --schema examples/ecommerce/ecommerce.json \ --format avro \ --number 1 \ + --prefix mz_datagen_ecommerce \ --dry-run \ --debug ``` 1. Start producing data to Kafka while you set up Materialize. ```bash datagen \ - -s ecommerce.json \ + -s examples/ecommerce/ecommerce.json \ -f avro \ -n -1 \ + -p mz_datagen_ecommerce \ --wait 500 ``` ### Materialize +Materialize is a [streaming database](https://materialize.com/guides/streaming-database/). You create materialized views with standard SQL and Materialize will eagerly read from Kafka topics and Postgres tables and keep your materialized views up to date automatically in response to new data. It's Postgres wire compatible, so you can read your materialized views directly with the `psql` CLI or any Postgres client library. + 1. [Register for access](https://materialize.com/register/) to Materialize. 1. Enable your region. 1. In a separate terminal session, [install `psql`](https://materialize.com/docs/integrations/sql-clients/#psql). @@ -165,7 +171,7 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry r1 (SIZE='2xsmall') ); ``` -1. Create `UPSERT` sources for `users` and `items`. +1. Create `UPSERT` sources for `users`, `purchases`, and `items`. ```sql CREATE SOURCE users IN CLUSTER sources @@ -174,31 +180,32 @@ This tutorial will use a Confluent Cloud Basic Kafka Cluster and Schema Registry KEY FORMAT BYTES VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + INCLUDE TIMESTAMP AS ts ENVELOPE UPSERT; ``` ```sql - CREATE SOURCE items + CREATE SOURCE purchases IN CLUSTER sources FROM KAFKA CONNECTION confluent_kafka - (TOPIC 'mz_datagen_ecommerce_items') + (TOPIC 'mz_datagen_ecommerce_purchases') KEY FORMAT BYTES VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr + INCLUDE TIMESTAMP AS ts ENVELOPE UPSERT; ``` - > :notebook: [`UPSERT` envelope](https://materialize.com/docs/sql/create-sink/#upsert-envelope) means that Kafka records of the same key will be interpreted as inserts (key doesn't exist yet), updates (key already exists), or deletes (`null` payload, a.k.a. tombstone). -1. Create an append-only (`ENVELOPE NONE`) source for `purchases`. ```sql - CREATE SOURCE purchases + CREATE SOURCE items IN CLUSTER sources FROM KAFKA CONNECTION confluent_kafka - (TOPIC 'mz_datagen_ecommerce_purchases') + (TOPIC 'mz_datagen_ecommerce_items') KEY FORMAT BYTES VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr - ENVELOPE NONE; + INCLUDE TIMESTAMP AS ts + ENVELOPE UPSERT; ``` - > :notebook: A source that uses `ENVELOPE NONE` is referred to as an [append-only](https://materialize.com/docs/sql/create-source/#append-only-envelope) source. In this case, we treat all new records as inserts, even though they have the same key. In this case, a single purchase can have multiple rows corresponding to the different items in the purchase. + > :notebook: [`UPSERT` envelope](https://materialize.com/docs/sql/create-sink/#upsert-envelope) means that Kafka records of the same key will be interpreted as inserts (key doesn't exist yet), updates (key already exists), or deletes (`null` payload, a.k.a. tombstone). ## Query the Results @@ -209,82 +216,124 @@ Materialize specializes in efficient, incremental view maintenance over changing ```sql SET CLUSTER = ecommerce; ``` -1. Create a bunch of indexes. - ```sql - CREATE INDEX "users_idx" ON "users" ("id"); - CREATE INDEX "items_idx" ON "items" ("id"); - CREATE INDEX "purchases_idx_items" ON "purchases" ("item_id"); - CREATE INDEX "purchases_idx_users" ON "purchases" ("user_id"); - ``` -1. Create a view that calculates the purchase history for each user. +1. Create some views related to recent purchase history for each user. ```sql + -- create a view of items that were apart of purchases in the last minute CREATE VIEW - "purchases_agg" - AS - SELECT - "p"."id", - "list_agg"("i"."id") AS "item_ids", - "list_agg"("u"."id") AS "user_ids", - "sum"("i"."price"::"numeric") AS "total" + purchases_items_last_min + AS + SELECT id, user_id, unnest(item_ids) AS item_id, ts + FROM purchases + WHERE + mz_now() >= ts + AND + mz_now() <= ts + INTERVAL '1m'; + + -- create indexes for the join columns of the next view. + CREATE INDEX ON purchases_items_last_min (item_id); + CREATE INDEX ON items (id); + + -- enrich the previous view with item description and item material + CREATE VIEW + purchases_items_last_min_enriched + AS + SELECT + p.id, + p.user_id, + p.item_id, + i.material, + i.description, + p.ts FROM - "purchases" AS "p" - JOIN - "items" AS "i" - ON "p"."item_id" = "i"."id" - JOIN - "users" AS "u" - ON "p"."user_id" = "u"."id" - GROUP BY - "p"."id"; - ``` -1. Create an index on that view to compute the results and load them into memory for efficient point lookups. - ```sql - CREATE INDEX "purchases_agg_idx_user_ids" ON "purchases_agg" ("user_ids"); + purchases_items_last_min AS p + JOIN items AS i ON p.item_id = i.id; + + -- create indexes on join columns for the next view + CREATE INDEX ON purchases (user_id); + CREATE INDEX ON users (id); + + -- calculate the sum of purchases over the last minute for each user + CREATE VIEW + purchases_total_last_min + AS + SELECT + u.id AS user_id, + list_agg(p.id) AS purchase_ids, + SUM(CAST(p.total AS numeric)) AS total_last_min + FROM + users AS u + JOIN purchases AS p ON u.id = p.user_id + WHERE + mz_now() >= p.ts + AND + mz_now() <= p.ts + INTERVAL '1m' + GROUP BY u.id; + + -- create index on join column for the next view + CREATE INDEX ON purchases_items_last_min_enriched (user_id); + + -- create a view that includes user email, items, materials, and total spend + -- for each user over the last minute + CREATE VIEW + items_and_total_last_min + AS + SELECT + pi.user_id, + u.email, + pt.total_last_min, + list_agg(pi.item_id) AS item_ids, + list_agg(pi.material) AS materials + FROM + purchases_items_last_min_enriched AS pi + JOIN + purchases_total_last_min AS pt + ON pi.user_id = pt.user_id + JOIN users AS u ON pi.user_id = u.id + GROUP BY 1, 2, 3; + + -- create index to compute the results and load them into memory + -- for efficient point lookups by email + CREATE INDEX ON items_and_total_last_min (email); ``` -1. Look up the purchase history for various users. +1. Look up the purchase history for various user emails. ```sql - SELECT * FROM "purchases_agg" WHERE 70 = ANY ("user_ids"); + SELECT * FROM items_and_total_last_min WHERE email LIKE '%example.net'; ``` ``` - id | item_ids | user_ids | total - -----+-----------------------+---------------+------- - 690 | {3931} | {70} | 903 - 762 | {3704} | {70} | 670 - 401 | {1396,3966} | {19,70} | 1453 - 704 | {2345,2956} | {1,70} | 1699 - 219 | {2212,4823} | {53,70} | 1799 - 7 | {1793,3125,1088} | {3,70,95} | 755 - 174 | {57,617,4768} | {65,70,85} | 2687 - 983 | {1301,2906,3935,2557} | {8,21,70,100} | 2023 + -[ RECORD 1 ]--+--------------------------------------------------------------------------------------------------------------------- + user_id | 955 + email | Stanford60@example.net + total_last_min | 5621 + item_ids | {3590,3392,1888,2656,3436,417,458,463,752,4080} + materials | {Fresh,Fresh,Steel,Steel,Steel,Bronze,Frozen,Rubber,Wooden,Plastic} + -[ RECORD 2 ]--+--------------------------------------------------------------------------------------------------------------------- + user_id | 213 + email | Emilia.Kuhlman@example.net + total_last_min | 5168 + item_ids | {3090,2329,4639,1330,3403,3664,84,3985,4500,4269,766} + materials | {Soft,Fresh,Steel,Bronze,Bronze,Cotton,Cotton,Wooden,Plastic,Plastic,Plastic} + ... ``` > :bulb: Notice how the results are non empty! If we were generating random records, these joins would be empty because there would likely be no matches on the join conditions. -1. Subscribe to changes in purchase history for a particular user in near-real time. +1. Subscribe to changes in recent purchase history in near-real time. Press `Ctrl+C` to quit. ```sql - COPY - (SUBSCRIBE - (SELECT - item_ids, user_ids, total - FROM - "purchases_agg" - WHERE - 4 = ANY ("user_ids"))) - TO - STDOUT; + COPY (SUBSCRIBE (SELECT * FROM items_and_total_last_min)) TO STDOUT; ``` ``` - time | diff | item_ids | user_ids | total - 1678475187360 1 {2175} {4} 839 - 1678475187360 1 {4675,3147} {4,20} 1369 - 1678475187360 1 {1423,3769,501,4860} {4,17,59,74} 1818 - 1678475187360 1 {530,4653,670,4767,3558,501} {2,3,4,22,38,91} 2571 - 1678475200000 1 {2585,2442} {4,88} 414 - 1678475200000 1 {539,423} {4,94} 1453 - 1678475200000 1 {869,944} {4,29} 1087 - 1678475200000 1 {3678,2426,4481} {4,45,47} 789 - 1678475293000 -1 {2585,2442} {4,88} 414 - 1678475293000 1 {2585,2442} {4,88} 943 + time | diff | user_id | email | total_last_min | item_ids | materials + 1679429934200 1 596 Talon.Ondricka@example.com 3198 {2870,2491,2752,3049,2032,4596} {Fresh,Steel,Rubber,Rubber,Granite,Concrete} + 1679429934200 -1 596 Talon.Ondricka@example.com 4112 {2870,3386,2491,2752,3049,2032,4596} {Soft,Fresh,Steel,Rubber,Rubber,Granite,Concrete} + 1679429934326 1 596 Talon.Ondricka@example.com 973 {2870,3049,4596} {Steel,Rubber,Concrete} + 1679429934326 -1 596 Talon.Ondricka@example.com 3198 {2870,2491,2752,3049,2032,4596} {Fresh,Steel,Rubber,Rubber,Granite,Concrete} + 1679429934441 -1 596 Talon.Ondricka@example.com 973 {2870,3049,4596} {Steel,Rubber,Concrete} + 1679429935000 1 346 Chaz_Zboncak83@example.com 991 {571,1421,2730,3310} {Fresh,Cotton,Rubber,Plastic} + 1679429935000 -1 269 Keon.Schumm@example.net 6747 {4,3101,2846,1067,4170,4940,4437,2430,898,4272,1494} {Soft,Fresh,Steel,Bronze,Bronze,Bronze,Cotton,Cotton,Rubber,Wooden,Wooden} + 1679429935000 1 269 Keon.Schumm@example.net 6747 {4,3101,2846,1067,4170,4940,4437,2430,898,4272,1494} {Soft,Bronze,Cotton,Frozen,Frozen,Rubber,Wooden,Wooden,Plastic,Plastic,Plastic} ...and so on ``` + > :bulb: What's really cool about this is the calculated total is fully consistent with the list of items at all times even though they come from different views. Yay for consistency! + + > :bulb: We see diffs of +1 and -1 as purchases exit the 1 minute window and as users make new purchases. There will also be automatic updates if the user changes their email address. Your views are always up to date in response to newly arriving data. ## Clean up @@ -299,6 +348,7 @@ Materialize specializes in efficient, incremental view maintenance over changing datagen \ -s ecommerce.json \ -f avro \ + -p mz_datagen_ecommerce \ --clean ``` diff --git a/examples/ecommerce/blog.json b/examples/ecommerce/blog.json deleted file mode 100644 index dfff1e1..0000000 --- a/examples/ecommerce/blog.json +++ /dev/null @@ -1,61 +0,0 @@ -[ - { - "_meta": { - "topic": "mz_datagen_blog_users", - "key": "id", - "relationships": [ - { - "topic": "mz_datagen_blog_posts", - "parent_field": "id", - "child_field": "user_id", - "records_per": 2 - } - ] - }, - "id": "datatype.number(100)", - "name": "internet.userName", - "email": "internet.exampleEmail", - "phone": "phone.imei", - "website": "internet.domainName", - "city": "address.city", - "company": "company.name" - }, - { - "_meta": { - "topic": "mz_datagen_blog_posts", - "key": "id", - "relationships": [ - { - "topic": "mz_datagen_blog_comments", - "parent_field": "id", - "child_field": "post_id", - "records_per": 2 - } - ] - }, - "id": "datatype.number(1000)", - "user_id": "datatype.number(100)", - "title": "lorem.sentence", - "body": "lorem.paragraph" - }, - { - "_meta": { - "topic": "mz_datagen_blog_comments", - "key": "id", - "relationships": [ - { - "topic": "mz_datagen_blog_users", - "parent_field": "user_id", - "child_field": "id", - "records_per": 1 - } - ] - }, - "id": "datatype.number(2000)", - "user_id": "datatype.number(100)", - "body": "lorem.paragraph", - "post_id": "datatype.number(1000)", - "views": "datatype.number({\"min\": 100, \"max\": 1000})", - "status": "datatype.number(1)" - } -] diff --git a/examples/ecommerce/ecommerce.json b/examples/ecommerce/ecommerce.json index 1d79477..5f41045 100644 --- a/examples/ecommerce/ecommerce.json +++ b/examples/ecommerce/ecommerce.json @@ -1,50 +1,50 @@ [ { "_meta": { - "topic": "mz_datagen_ecommerce_users", + "topic": "users", "key": "id", "relationships": [ { - "topic": "mz_datagen_ecommerce_purchases", + "topic": "purchases", "parent_field": "id", "child_field": "user_id", "records_per": 4 } ] }, - "id": "datatype.number(100)", - "name": "internet.userName", - "email": "internet.exampleEmail", - "city": "address.city", - "state": "address.state", - "zipcode": "address.zipCode" + "id": "faker.datatype.number(1000)", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "city": "faker.address.city()", + "state": "faker.address.state()", + "zipcode": "faker.address.zipCode()" }, { "_meta": { - "topic": "mz_datagen_ecommerce_purchases", + "topic": "purchases", "key": "id", "relationships": [ { - "topic": "mz_datagen_ecommerce_items", - "parent_field": "item_id", - "child_field": "id", - "records_per": 1 + "topic": "items", + "parent_field": "item_ids", + "child_field": "id" } ] }, - "id": "datatype.number(1000)", - "user_id": "datatype.number(100)", - "item_id": "datatype.number(5000)" + "id": "faker.datatype.uuid()", + "user_id": "this string can be anything since this field is determined by user.id", + "item_ids": "faker.helpers.uniqueArray((()=>{return Math.floor(Math.random()*5000);}), Math.floor(Math.random()*4+1))", + "total": "faker.commerce.price(25, 2500)" }, { "_meta": { - "topic": "mz_datagen_ecommerce_items", + "topic": "items", "key": "id" }, - "id": "datatype.number(5000)", - "name": "commerce.product", - "price": "commerce.price", - "description": "commerce.productDescription", - "material": "commerce.productMaterial" + "id": "this string can be anything since this field is determined by purchases.item_ids", + "name": "faker.commerce.product()", + "price": "faker.commerce.price(5, 500)", + "description": "faker.commerce.productDescription()", + "material": "faker.commerce.productMaterial()" } ] diff --git a/package.json b/package.json index d348320..750b09b 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@materializeinc/datagen", "description": "Materialize Datagen CLI tool", - "version": "0.1.4", + "version": "0.2.0", "license": "Apache-2.0", "type": "module", "bin": { diff --git a/src/schemas/generateMegaRecord.ts b/src/schemas/generateMegaRecord.ts index 0f1bd90..74f1bf0 100644 --- a/src/schemas/generateMegaRecord.ts +++ b/src/schemas/generateMegaRecord.ts @@ -1,58 +1,53 @@ import { faker } from '@faker-js/faker'; import alert from 'cli-alerts'; + export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}){ // helper function to generate a record from json schema with faker data + for (const field in fakerRecord) { if (field in generatedRecord) { continue } if (typeof fakerRecord[field] === 'object') { generatedRecord[field] = await generateRandomRecord(fakerRecord[field]) - } else { - if (fakerRecord[field] === 'iteration.index'){ - generatedRecord[field] = global.iterationIndex + 1; - continue; - } - try { - const [fakerMethod, ...property] = fakerRecord[field].split('.'); - const fakerProperty = property.join('.'); - if (fakerProperty.includes('(')) { - const property = fakerProperty.split('(')[0]; - let args = fakerProperty.split('(')[1].split(')')[0]; - - if (!args.includes('{')) { - args = !isNaN(args) ? Number(args) : args === 'true' ? true : args === 'false' ? false : args; - } else { - try { - args = JSON.parse(args); - } catch (error) { - alert({ - type: `error`, - name: `JSON parse error`, - msg: `${error.message}\n${JSON.stringify(generatedRecord, null, 2)}` - }); - } - } - generatedRecord[field] = faker[fakerMethod][property](args); - } else { - generatedRecord[field] = faker[fakerMethod][fakerProperty](); - } + continue + } + + if (fakerRecord[field] === 'iteration.index'){ + generatedRecord[field] = global.iterationIndex + 1; + continue; + } + if (fakerRecord[field].match("faker\..*")) { + try { + generatedRecord[field] = + (new Function( + 'faker', + `return ${fakerRecord[field]};` + ))(faker); + } catch (error) { alert({ type: `error`, name: `Faker Error`, - msg: `${error.message}\n${JSON.stringify(generatedRecord, null, 2)}` + msg: `${error.message}\nGenerated record:\n${JSON.stringify(generatedRecord, null, 2)}` }); - process.exit(); + process.exit(1); } + } else { + alert({ + type: `error`, + name: `Faker Error`, + msg: `Could not parse Faker method. See FakerJS API documentation.\nFailed while parsing:\n${fakerRecord[field]}` + }); + process.exit(1); } - } return generatedRecord; } + export async function generateMegaRecord(schema: any) { // goal is to return a "mega record" with structure // {topic: {key: the topic key field name, records: [list of records to send to Kafka]} @@ -101,16 +96,27 @@ export async function generateMegaRecord(schema: any) { // for every existing record, generate "records_per" // number of new records for the dependent topic for (const existingRecord of megaRecord[topic].records) { - for (let i = 1; i <= relationship.records_per; i++) { - let newRecord = {} - // ensure the new record obeys the foriegn key constraint - // specified in the relationship - newRecord[relationship.child_field] = existingRecord[relationship.parent_field] - if (!megaRecord[relatedTopic]) { - megaRecord[relatedTopic] = { "key": _meta.key, "records": [] } + // ensure the new record obeys the foreign key constraint + // specified in the relationship + let newRecords = []; + let existingValue = existingRecord[relationship.parent_field]; + if (Array.isArray(existingValue)) { + for (let i = 0; i < existingValue.length; i++) { + let newRecord = {}; + newRecord[relationship.child_field] = existingValue[i] + newRecords.push(newRecord); } - megaRecord[relatedTopic].records.push(newRecord); + } else { + for (let i = 1; i <= relationship.records_per; i++) { + let newRecord = {}; + newRecord[relationship.child_field] = existingValue; + newRecords.push(newRecord); + } + } + if (!megaRecord[relatedTopic]) { + megaRecord[relatedTopic] = { "key": _meta.key, "records": [] }; } + megaRecord[relatedTopic].records.push(...newRecords); } } } diff --git a/src/schemas/parseAvroSchema.ts b/src/schemas/parseAvroSchema.ts index 3de7e05..ce03e0a 100644 --- a/src/schemas/parseAvroSchema.ts +++ b/src/schemas/parseAvroSchema.ts @@ -42,7 +42,7 @@ async function convertAvroSchemaToJson(schema: any): Promise { } else { // If nested, generated nested json recursively if (column.type.type === 'array') { - return schema[column.name] = 'datatype.array'; + return schema[column.name] = 'faker.datatype.array()'; } return schema[column.name] = avroTypesToFakerJs(column.type); } @@ -59,32 +59,32 @@ function avroTypesToFakerJs(avroType: any) { switch (avroType) { case 'string': - return 'datatype.string'; + return 'faker.datatype.string()'; case 'int': - return 'datatype.number'; + return 'faker.datatype.number()'; case 'long': - return 'datatype.number'; + return 'faker.datatype.number()'; case 'float': - return 'datatype.number'; + return 'faker.datatype.number()'; case 'double': - return 'datatype.number'; + return 'faker.datatype.number()'; case 'boolean': - return 'datatype.boolean'; + return 'faker.datatype.boolean()'; case 'bytes': - return 'datatype.string'; + return 'faker.datatype.string()'; case 'array': - return 'datatype.array'; + return 'faker.datatype.array()'; case 'map': - return 'datatype.object'; + return 'faker.datatype.object()'; case 'union': - return 'datatype.union'; + return 'faker.datatype.union()'; case 'enum': - return 'datatype.string'; + return 'faker.datatype.string()'; case 'fixed': - return 'datatype.string'; + return 'faker.datatype.string()'; case 'record': - return 'datatype.object'; + return 'faker.datatype.object()'; default: - return 'datatype.string'; + return 'faker.datatype.string()'; } } diff --git a/src/schemas/parseSqlSchema.ts b/src/schemas/parseSqlSchema.ts index 0f86251..2ee1463 100644 --- a/src/schemas/parseSqlSchema.ts +++ b/src/schemas/parseSqlSchema.ts @@ -64,21 +64,21 @@ export async function convertSqlSchemaToJson(tables: Array) { } else { switch (column.definition.dataType.toLowerCase()) { case 'string': - schema[column.column.column] = 'datatype.string'; + schema[column.column.column] = 'faker.datatype.string()'; break; case 'int': case 'serial': case 'bigint': - schema[column.column.column] = 'datatype.number'; + schema[column.column.column] = 'faker.datatype.number()'; break; case 'text': - schema[column.column.column] = 'datatype.string'; + schema[column.column.column] = 'faker.datatype.string()'; break; case 'timestamp': - schema[column.column.column] = 'datatype.datetime'; + schema[column.column.column] = 'faker.datatype.datetime()'; break; default: - schema[column.column.column] = 'datatype.string'; + schema[column.column.column] = 'faker.datatype.string()'; break; } } diff --git a/tests/array.json b/tests/array.json new file mode 100644 index 0000000..c2a39c2 --- /dev/null +++ b/tests/array.json @@ -0,0 +1,24 @@ +[ + { + "_meta": { + "key": "id", + "topic": "test", + "relationships": [ + { + "topic": "test2", + "parent_field": "arr", + "child_field": "id" + } + ] + }, + "id": "faker.datatype.uuid()", + "arr": "faker.helpers.uniqueArray((()=>{return Math.floor(Math.random()*5000);}), Math.floor(Math.random()*5))" + }, + { + "_meta": { + "topic": "test2", + "key": "id" + }, + "id": "" + } +] \ No newline at end of file diff --git a/tests/iterationIndex.json b/tests/iterationIndex.json index f973a53..245f547 100644 --- a/tests/iterationIndex.json +++ b/tests/iterationIndex.json @@ -4,13 +4,13 @@ "key": "id" }, "id": "iteration.index", - "timestamp": "date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')", + "timestamp": "faker.date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')", "location": { - "latitude": "datatype.number({ \"max\": 90, \"min\": -90})", - "longitude": "datatype.number({ \"max\": 180, \"min\": -180})" + "latitude": "faker.datatype.number({ max: 90, min: -90})", + "longitude": "faker.datatype.number({ max: 180, min: -180})" }, - "pm25": "datatype.float({ \"min\": 10, \"max\": 90 })", - "pm10": "datatype.float({ \"min\": 10, \"max\": 90 })", - "temperature": "datatype.float({ \"min\": -10, \"max\": 120 })", - "humidity": "datatype.float({ \"min\": 0, \"max\": 100 })" + "pm25": "faker.datatype.float({ min: 10, max: 90 })", + "pm10": "faker.datatype.float({ min: 10, max: 90 })", + "temperature": "faker.datatype.float({ min: -10, max: 120 })", + "humidity": "faker.datatype.float({ min: 0, max: 100 })" } diff --git a/tests/products.sql b/tests/products.sql index 6c52e7b..136abb6 100644 --- a/tests/products.sql +++ b/tests/products.sql @@ -1,9 +1,9 @@ CREATE TABLE "ecommerce"."products" ( "id" int PRIMARY KEY, - "name" varchar COMMENT 'internet.userName', - "merchant_id" int NOT NULL COMMENT 'datatype.number', - "price" int COMMENT 'datatype.number({ "min": 1000, "max": 100000 })', - "status" int COMMENT 'datatype.boolean', + "name" varchar COMMENT 'faker.internet.userName()', + "merchant_id" int NOT NULL COMMENT 'faker.datatype.number()', + "price" int COMMENT 'faker.datatype.number({ min: 1000, max: 100000 })', + "status" int COMMENT 'faker.datatype.boolean()', "created_at" datetime DEFAULT (now()) ); diff --git a/tests/schema-nested.json b/tests/schema-nested.json index fdd9f5f..cb89c43 100644 --- a/tests/schema-nested.json +++ b/tests/schema-nested.json @@ -2,26 +2,26 @@ "_meta": { "topic": "mz_datagen_users" }, - "id": "datatype.uuid", - "name": "internet.userName", - "email": "internet.exampleEmail", - "phone": "phone.imei", - "website": "internet.domainName", - "city": "address.city", - "company": "company.name", - "age": "datatype.number", - "created_at": "datatype.datetime", + "id": "faker.datatype.uuid()", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "phone": "faker.phone.imei()", + "website": "faker.internet.domainName()", + "city": "faker.address.city()", + "company": "faker.company.name()", + "age": "faker.datatype.number()", + "created_at": "faker.datatype.datetime()", "posts": [ { - "id": "datatype.uuid", - "title": "lorem.sentence", - "body": "lorem.paragraph", + "id": "faker.datatype.uuid()", + "title": "faker.lorem.sentence()", + "body": "faker.lorem.paragraph()", "comments": [ { - "id": "datatype.uuid", - "name": "internet.userName", - "email": "internet.exampleEmail", - "body": "lorem.paragraph" + "id": "faker.datatype.uuid()", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "body": "faker.lorem.paragraph()" } ] } diff --git a/tests/schema.json b/tests/schema.json index 44d9335..628523e 100644 --- a/tests/schema.json +++ b/tests/schema.json @@ -13,16 +13,16 @@ ] }, "nested": { - "phone": "phone.imei", - "website": "internet.domainName" + "phone": "faker.phone.imei()", + "website": "faker.internet.domainName()" }, - "id": "datatype.number(100)", - "name": "internet.userName", - "email": "internet.exampleEmail", - "phone": "phone.imei", - "website": "internet.domainName", - "city": "address.city", - "company": "company.name" + "id": "faker.datatype.number(100)", + "name": "faker.internet.userName()", + "email": "faker.internet.exampleEmail()", + "phone": "faker.phone.imei()", + "website": "faker.internet.domainName()", + "city": "faker.address.city()", + "company": "faker.company.name()" }, { "_meta": { @@ -37,10 +37,10 @@ } ] }, - "id": "datatype.number(1000)", - "user_id": "datatype.number(100)", - "title": "lorem.sentence", - "body": "lorem.paragraph" + "id": "faker.datatype.number(1000)", + "user_id": "faker.datatype.number(100)", + "title": "faker.lorem.sentence()", + "body": "faker.lorem.paragraph()" }, { "_meta": { @@ -55,11 +55,11 @@ } ] }, - "id": "datatype.number(2000)", - "user_id": "datatype.number(100)", - "body": "lorem.paragraph", - "post_id": "datatype.number(1000)", - "views": "datatype.number({\"min\": 100, \"max\": 1000})", - "status": "datatype.number(1)" + "id": "faker.datatype.number(2000)", + "user_id": "faker.datatype.number(100)", + "body": "faker.lorem.paragraph()", + "post_id": "faker.datatype.number(1000)", + "views": "faker.datatype.number({min: 100, max: 1000})", + "status": "faker.datatype.number(1)" } ]