Bellboy – JavaScript data stream ETL engine
source link: https://www.tuicool.com/articles/hit/BbIJj2z
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
bellboy
Highly performant JavaScript data stream ETL engine.
How it works?
Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.
Installation
npm install bellboy
Example
const bellboy = require(bellboy); const fs = require('fs'); const path = require('path'); const { promisify } = require('util'); const rename = promisify(fs.rename); (async () => { const srcPath = `C:/source`; // tell bellboy to process all Excel files in folder // and export every record to Postgres const processor = new bellboy.ExcelProcessor({ path: srcPath, hasHeader: true, destinations: [ { type: 'postgres', connection: { user: 'user', password: 'password', server: 'localhost', database: 'bellboy' }, // tell bellboy to send records to destination // as soon as he collects 9999 records batchSize: 9999, // in addition to processed record fields, // add a new one - status field before sending to destination recordGenerator: async function* (record) { yield { ...record, status: 'done' }; } } ] }); // tell him to move file away as soon as it was processed processor.on('processedFile', async (file) => { const filePath = path.join(srcPath, file); const newFilePath = path.join(`./destination`, file); await rename(filePath, newFilePath); }); // get it going! await processor.process(); })();
Processors
Each processor in bellboy
is a class which has a single responsibility of processing data of specific type -
- processes MQTT protocol messages.
- processes data received from a HTTP call.
- processes XLSX file data from the file system.
- processes JSON file data from the file system.
- processes data received from a PostgreSQL SELECT.
- processes data received from a MSSQL SELECT.
- processes dynamically generated data.
- processes new lines added to the file.
Processor instance methods
-
process
async function()
Starts processing data. -
on
function(event, async function listener
Intercepts specifiedevent
and pauses processing untillistener
function will be executed.
Ifon
returns sometruthy
value, processing will be stopped.
// move file to the new location when processedFile event is fired processor.on('processedFile', async (file) => { const filePath = path.join(srcPath, file); const newFilePath = path.join(`./destination`, file); await rename(filePath, newFilePath); });
Options
Each processor has specific set of options in addition to general options -
-
destinations
Destination[]
required
List of processor destinations. -
verbose
boolean
If set totrue
, all events will be logged to stdout.
Events
-
startProcessing
Emitted when processor starts working. -
processedFile
Emitted when processor ends it's work. -
transformingBatch
Emitted when batch is about to be transformed - right before callingbatchTransformer
function. -
transformedBatch
Emitted when batch has been transformed - after callingbatchTransformer
function. -
loadingBatch
Emitted when batch is about to be loaded in destination. -
loadingBatchError
(error)
Emitted when batch load has failed. -
loadedBatch
Emitted when batch has been loaded.
MqttProcessor
Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.
Options
- General processor options
-
connection
object
required
-
url
string
-
topics
string[]
-
url
HttpProcessor
Options
- General processor options
-
connection
object
required
Options from request library. -
dataFormat
delimited | json
required
-
delimiter
string
required for delimited
-
jsonPath
string
required for json
Only values that match provided JSONPath will be processed. -
nextRequest
async function(header)
Function which must returnconnection
for the next request ornull
if the next request is not needed. If data format isjson
, it will have nullableheader
parameter which will contain data before the firstjsonPath
match.
// gets next connection from the header until last page is reached nextRequest: async function (header) { if (header) { const pagination = header.pagination; if (pagination.total_pages > pagination.current_page) { return { ...connection, url: `${url}¤t_page=${pagination.current_page + 1}` }; } } return null; },
Directory processors
Used for streaming text data from files in directory. There are currently three types of directory processors - ExcelProcessor
, JsonProcessor
and TailProcessor
. Such processors search for the files in the source directory and process them one by one.
Options
- General processor options
-
path
string
required
Path to the directory where files are located. -
filePattern
string
Regex pattern for the files to be processed. If not specified, all files in the directory will be matched. -
files
string[]
Array of file names. If not specified, all files in the directory will be matched againstfilePattern
regex and processed in alphabetical order.
Events
-
processingFile
(file, filePath)
Emitted when file is about to be processed. -
processedFile
(file, filePath)
Emitted after file has been processed.
ExcelProcessor
Processes XLSX
files.
Options
- Directory processor options
-
hasHeader
boolean
Wether worksheet has header or not,false
by default. -
skipRows
number
How many rows to skip,0
by default. -
sheetName
string
-
sheetIndex
number
Starts from0
. -
sheetGetter
async function(sheets)
Function which has array ofsheets
as a parameter and must return required name of the sheet.
// returns last sheet name sheetGetter: async (sheets) => { return sheets[sheets.length - 1]; },
If no sheetName
specified, value of the sheetIndex
will be used. If it isn't specified either, sheetGetter
function will be called. If none options are specified, first sheet will be processed.
JsonProcessor
Options
- Directory processor options
-
jsonPath
string
required
Only values that match provided JSONPath will be processed.
TailProcessor
Watches for file changes and outputs last part of file as soon as new lines are added to the file.
Options
- Directory processor options
-
fromBeginning
boolean
In addition to emitting new lines, emits lines from the beginning of file,false
by default.
recordGenerator's row
-
file
string
Name of the file the data came from. -
data
string
Database processors
Processes SELECT
query row by row. There are two database processors - PostgresProcessor
( usage examples
) and MssqlProcessor
( usage examples
). Both of them are having the same options.
Options
- General processor options
-
query
string
required
Query to execute. -
connection
object
required
- user
- password
- server
- host
- database
-
schema
Currently available only forPostgresProcessor
.
DynamicProcessor
Processor which generates records on the fly. Can be used to define custom data processors.
Options
- General processor options
-
generator
async generator function
required
Generator function which must yield records to process.
// generates 10 records dynamically generator: async function* () { for (let i = 0; i < 10; i++) { yield { data: i }; } },
Destinations
Every processor can have as many destinations (outputs) as needed. For example, one processor can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.
- logs data to console .
- executes HTTP request calls.
- inserts/upserts data to PostgreSQL database.
- inserts data to MSSQL database.
Options
-
batchSize
number
required
Number of records to be processed before loading them to the destination. -
recordGenerator
async generator function(row)
Function which processes and transforms every row from source. -
batchTransformer
async function(rows)
Function which processes and transforms whole batch of rows. This function is being called after row count reachesbatchSize
. Data is being loaded to destination immediately after this function has been executed.
stdout
This destination logs out all data to stdout (console).
Options
- General destination options
-
type
stdout
required
-
asTable
boolean
If set totrue
, data will be printed as table.
http
Puts processed data one by one in body
and executes specified HTTP request.
Options
- General destination options
-
type
http
required
-
setup
object
required
Options from request library.
postgres
Inserts data to PostgreSQL.
Options
- General destination options
-
type
postgres
required
-
setup
object
required
-
table
string
Table name. -
upsertConstraints
string[]
If specified,UPSERT
command will be executed based on provided constraints. -
connection
object
- user
- password
- server
- host
- database
- schema
-
table
mssql
Inserts data to MSSQL.
Options
- General destination options
-
type
mssql
required
-
setup
object
required
-
table
string
Table name. -
connection
object
- user
- password
- server
- host
- database
-
table
Custom
Custom destination defined by load
function.
Options
- General destination options
-
type
custom
required
-
load
async function(rows)
required
Function which can be extended to implement custom destination.
Testing
Tests can be run by using docker-compose up
command.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK