Flux
Overview
Flux is a performant, easy-to-use ingestion tool being developed at Mach5 Labs.
Flux works by declaratively specifying a dataflow graph as a configuration file. This graph specification is then executed by the Flux runtime. Nodes in the graph are called processors. A processor has a type, which defines its input, output, and configuration parameters. For example, the std::dir-scanner
processor takes directory paths as inputs and outputs file and directory paths matching its configuration.
Example configuration
nodes: - id: start class: data config: data: /my-data targets: - id: ds index: 0 - id: ds class: dir-scanner config: recurse: true filter: exact-match: field: type value: file targets: - id: jr index: 0 - id: jr class: replicate config: inner: id: inner_jr class: json-reader replication: 10 targets: - id: add_timestamp index: 0 - id: add_timestamp class: replicate config: inner: id: inner-js-script class: js-script config: script : | arg['@timestamp'] = Date.now(); arg replication: 10 targets: - id: bulk_insert index: 0 - id: bulk_insert class: replicate config: inner: id: inner-bulk_insert class: opensearch-bulk config: host: localhost port: 9200 path: /os/_bulk index: my-index replication: 10
At a high-level, the above configuration does the following:
- Scans the path
/my-data
for all files - Reads the files, expecting newline-delimited JSON (NDJSON)
- For each JSON record, augments the record with a new field containing the current time
- Inserts each record into
my-index
using the OpenSearch Bulk API
Notice that some of the processors have been wrapped by a replicate
processor. The replicate
processor allows scaling the computation of the inner processor that it wraps by creating multiple instances of it. This allows multiple CPU cores to simultaneously work on different values.
To run the dataflow, execute:
flux run job -c config.yaml
Reference
To list all available processors, run:
flux registry list-node-classes
To display configuration values for a processor, run:
flux registry describe-node-class <node-class>