The fluvio produce
command is a way to send records to the leader of a partition.
Produce records by specifying the destination Topic
Write messages to a topic/partition
Usage: fluvio produce [OPTIONS] <topic>
Arguments:
<topic> The name of the Topic to produce to
Options:
-v, --verbose
Print progress output when sending records
--key <KEY>
Sends key/value records with this value as key
--key-separator <KEY_SEPARATOR>
Sends key/value records split on the first instance of the separator
--raw
Send all input as one record. Use this when producing binary files
--compression <COMPRESSION>
Compression algorithm to use when sending records. Supported values: none, gzip, snappy, zstd and lz4
-f, --file <FILE>
Path to a file to produce to the topic. Default: Each line treated as single record unless `--raw` specified. If absent, producer will read stdin
--linger <LINGER>
Time to wait before sending Ex: '150ms', '20s'
--batch-size <BATCH_SIZE>
Max amount of bytes accumulated before sending
--isolation <ISOLATION>
Isolation level that producer must respect. Supported values: read_committed (ReadCommitted) - wait for records to be committed before response, read_uncommitted (ReadUncommitted) - just wait for leader to accept records
--delivery-semantic <DELIVERY_SEMANTIC>
Delivery guarantees that producer must respect. Supported values: at_most_once (AtMostOnce) - send records without waiting from response, at_least_once (AtLeastOnce) - send records and retry if error occurred [default: at-least-once]
--smartmodule <SMARTMODULE>
Name of the smartmodule
--smartmodule-path <SMARTMODULE_PATH>
Path to the smart module
--aggregate-initial <AGGREGATE_INITIAL>
(Optional) Value to use as an initial accumulator for aggregate SmartModules
-e, --params <PARAMS>
(Optional) Extra input parameters passed to the smartmodule. They should be passed using key=value format Eg. fluvio produce topic-name --smartmodule my_filter -e foo=bar -e key=value -e one=1
--transforms-file <TRANSFORMS_FILE>
(Optional) Path to a file with transformation specification
-t, --transform <TRANSFORM>
(Optional) Transformation specification as JSON formatted string. E.g. fluvio produce topic-name --transform='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'
-h, --help
Print help
The quickest way to send a record using the producer is to just type your record into standard input:
$ fluvio produce my-topic
> This is my first record ever
Ok!
> This is my second record ever
Ok!
> ^C
As the message says, each line that you type will be sent a new record to the topic.
The Ok!
was printed by the producer after each record, to let us know the record
was sent successfully.
Fluvio supports key/value records out-of-the-box. In a key/value record, the key is used to decide which partition the record is sent to. Let’s try sending some simple key/value records:
$ fluvio produce my-topic --key-separator=":"
> alice:Alice In Wonderland
Ok!
> batman:Bruce Wayne
Ok!
> ^C
So our records are being sent, but how do we know that the producer recognized each key properly?
We can use the --verbose
(-v
) flag to tell the producer to print back the keys and values it
recognizes. That way, we can be confident that our records are being sent the way we want them to be.
$ fluvio produce my-topic -v --key-separator=":"
> santa:Santa Claus
[santa] Santa Claus
Ok!
> ^C
The producer splits the key from the value and prints it in a [key] value
format.
Fluvio’s SmartModules can be applied to the producer to edit the contents of a stream
after the records are sent but before they are committed. One way to supply a WASM
SmartModule to the fluvio produce
command is with the --smartmodule-path
option.
Below is an example of how to apply a Map type SmartModule to transform each record in a stream. This particular SmartModule can be used to capitalize every letter in a string.
To use the SmartModule, compile it and provide the .wasm
file to the producer:
$ fluvio produce my-topic --smartmodule-path="fluvio_smartmodule_map.wasm"
To avoid sending the SmartModule binary to the cluster with every producer session, you can ask the cluster to store it for you:
$ fluvio smartmodule create --wasm-file="fluvio_smartmodule_map.wasm" my_map
Then just use use the name you provided to apply it:
$ fluvio produce my-topic --smartmodule="my_map"
When producing to a topic with multiple partitions, the producer will send all records with the same key to the same partition. Let’s test this out by making a multi-partition topic, then sending some key/value records.
First, we’ll use fluvio topic create
to create a topic called multi-keys
with 5 partitions:
$ fluvio topic create multi-keys -p 5
Next, let’s create a text file with the records we want to send:
# Put the following records into a text file using your favorite editor
$ cat records.txt
rafael:create account
rafael:validate account
samuel:create account
tabitha:create account
rafael:add item 1234 to cart
tabitha:validate account
samuel:validate account
rafael:add item 2345 to cart
tabitha:add item 9876 to cart
rafael:complete purchase
Then, we’ll send the key/value records from the file, using the --key-separator
flag to separate
our keys from our values. In this example, the keys are unique username.
$ fluvio produce multi-keys --key-separator=":" -f records.txt
Looking at this sample input, we can see that rafael
generated 5 events, samuel
generated 2 events, and tabitha
generated 3 events. When we look at the partitions,
we should see the records distributed in groups of 5, 2, and 3. We can use the
fluvio partition list
command to view the distribution of records in our partitions:
$ fluvio partition list
TOPIC PARTITION LEADER REPLICAS RESOLUTION SIZE HW LEO LRS FOLLOWER OFFSETS
multi-keys 0 0 [] Online 0 B 0 0 0 []
multi-keys 1 0 [] Online 0 B 0 0 0 []
multi-keys 2 0 [] Online 157 B 3 3 0 []
multi-keys 3 0 [] Online 220 B 5 5 0 []
multi-keys 4 0 [] Online 119 B 2 2 0 []
By looking at the high watermark (HW) and log-end-offset (LEO) of our partitions,
we can see how many records have landed in each partition. As we expected, they
were distributed in groups of 5, 3, and 2. Let’s dig a little further, we know that
rafael
was the key used by the group of 5 records, so we should be able to see those
records by using fluvio consume
to consume from partition 3.
$ fluvio consume multi-keys -B -p3 --key-value
[rafael] create account
[rafael] validate account
[rafael] add item 1234 to cart
[rafael] add item 2345 to cart
[rafael] complete purchase
When we produce to a topic with multiple partitions, records that have no key are assigned to partitions in a round-robin fashion. This ensures an even load distribution among the partitions.
To see this in action, let’s create a topic with multiple partitions using
fluvio topic create
.
$ fluvio topic create multi-no-keys -p 5
Let’s produce some data to our topic. We’ll use the same data from Example 3,
but this time we won’t tell the Producer to interpret our input as key-value records
(we’ll do this by omitting the --key-separator
flag).
# Put the following records into a text file using your favorite editor
$ cat records.txt
rafael:create account
rafael:validate account
samuel:create account
tabitha:create account
rafael:add item 1234 to cart
tabitha:validate account
samuel:validate account
rafael:add item 2345 to cart
tabitha:add item 9876 to cart
rafael:complete purchase
Next, we’ll produce the records into the multi-no-keys
stream.
$ fluvio produce multi-no-keys -f records.txt
Since records with no keys use round-robin partitioning, we should expect to see
the records be evenly distributed among the partitions. This differs from Example 3
in that the records are not grouped by any kind of key. Let’s take a look at our
partitions using fluvio partition list
.
$ fluvio partition list
TOPIC PARTITION LEADER REPLICAS RESOLUTION SIZE HW LEO LRS FOLLOWER OFFSETS
multi-no-keys 0 0 [] Online 120 B 2 2 0 []
multi-no-keys 1 0 [] Online 121 B 2 2 0 []
multi-no-keys 2 0 [] Online 124 B 2 2 0 []
multi-no-keys 3 0 [] Online 126 B 2 2 0 []
multi-no-keys 4 0 [] Online 127 B 2 2 0 []
Notice how the high watermark (HW) and log-end-offset (LEO) tell us that there are exactly 2 records in each partition. Our ten records have been evenly distributed!
Fluvio support different types of compression algorithms to send records. Compression, in general, improves throughput in exchange of some CPU cost to compress/uncompress the data.
Let’s try to use gzip
algorithm in the CLI.
First, we’ll use fluvio topic create
to create a topic called compressed
and other topic called uncompressed
:
$ fluvio topic create compressed
$ fluvio topic create uncompressed
Next, let’s create a text file called records.txt
with the following contents:
{"ts":"2020-06-18T10:44:12","started":{"pid":45678}}
{"ts":"2020-06-18T10:44:13","logged_in":{"username":"foo"},"connection":{"addr":"1.2.3.4","port":5678}}
{"ts":"2020-06-18T10:44:15","registered":{"username":"bar","email":"bar@example.com"},"connection":{"addr":"2.3.4.5","port":6789}}
{"ts":"2020-06-18T10:44:16","logged_out":{"username":"foo"},"connection":{"addr":"1.2.3.4","port":5678}}
{"ts":"2020-06-18T10:49:29","logged_in":{"username":"foo"},"connection":{"addr":"1.2.3.4","port":5678}}
{"ts":"2020-06-18T10:50:13","logged_in":{"username":"bar"},"connection":{"addr":"2.3.4.5","port":6789}}
{"ts":"2020-06-18T10:51:13","logged_out":{"username":"bar"},"connection":{"addr":"2.3.4.5","port":6789}}
Next, we’ll produce the records from that file into the compressed
stream using the --compression gzip
option.
$ fluvio produce compressed -f records.txt --compression gzip
Let’s produce also the same to the uncompressed
stream using no compression.
$ fluvio produce uncompressed -f records.txt # when no --compression flag is passed, it used `none` as compression algorithm
Since records are compressed in the producer before are sent to the SPU, their disk usage on the SPU should be lower than without compression. Let’s take a look at the disk usage by the partitions using fluvio partition list
.
$ fluvio partition list
TOPIC PARTITION LEADER REPLICAS RESOLUTION SIZE HW LEO LRS FOLLOWER OFFSETS
compressed 0 0 [] Online 328 B 7 7 0 []
uncompressed 0 0 [] Online 821 B 7 7 0 []
Notice how the SIZE field tell us that the compressed
topic is using less disk space than the uncompressed
topic for the same amount of records.
Also note that, consuming
from topics is done at the same way for both compressed and uncompressed data.