Software engineering

Introduction

BSPump

BSPump is an open-source project written in Python language, which can be used as a basis of software solution using real-time data analysis by every developer. It forms the basis of the BitSwan product which further expands the BSPump project for individual cases of usage with its own microservices, configurations and issues.

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump

Where to start

This repository can be cloned and used as a template for one´s own BSPump application: https://github.com/LibertyAces/BitSwanPump-BlankApp

The following chapters serve as a guide to creation of one´s own application and describe essential objects, tools and libraries related to the BSPump.

How to build a stream processor application

Design

First it is important to think about the architecture of the stream processor application. Start by drawing diagrams with desired components that will form the basis of the applications.

The main components are represented by BitSwan top-level objects:

  • Connection
  • Lookup
  • Pipeline
  • Matrix

Connections serve to connect your application with external components such as MySQL database or Kafka, while pipelines represent individual processing task of the incoming events from one or more systems (or primary pipelines, see diagram in the Application object section below), which are then parsed, enriched and sent to an external system, also represented by a connection. For more information about top-level objects, refer to the appropriate sections below.

Every pipeline should then be expanded to individual processors, where each of them is responsible for one parsing, enriching or analyzing task. See diagram in the Pipeline section below.

Start coding

The best practice is to create a Python *.py file and a Python module, both carrying the name of the application. The <Your_application_name>.py file would then simply include and run the application object, which is located inside the app.py file located in the <Your_application_name> module.

The folder structure is as follows:

> `<Your_application_name>.py`
> `<Your_application_name>`
	`__init__.py`
	 `app.py`

Start by cloning the BitSwan Blank Application, which already follows the mentioned folder structure. It is available on GitHub: https://github.com/LibertyAces/BitSwanPump-BlankApp

Inside the application object, the top-level BitSwan object are included in the following order (on how to do it in the code, refer to the Application object section below):

  • connections
  • lookups
  • (secondary) pipelines
  • (primary) pipelines

The reason is that lookups may use connections, that is why connections need to be created first. In the same manner, secondary pipelines may use both lookups and connections, while primary pipelines may utilize secondary pipelines (through analyzers, for instance), lookups and connections. For more information about top-level objects, see appropriate sections below.

Integrations

Connection objects serve for making connection with some other item or service within the infrastructure (see Connection section below) like Kafka, MySQL database, ElasticSearch and so on.

Make sure you give the connections proper ID, that would later be used in configuration and obtainment of the connection in other objects, such as lookups, pipelines and processors (see Identification and location of pipeline objects section below). It is a good practice to use one connection per one external system or database.

Pipeline construction

Processors should be located in separate files inside subfolders, whose name specify the shared responsibility of processors that are located inside it. For example, processors that filter, enrich or parse IP addresses should be located in the ip subfolder. Make sure the names are simple and understandable for external user programmers.

Depending on the type of the processor, the processor name should have the appropriate prefix like Filter, Parser, Enricher (see processor types below in the Processor section). For example, if a processor enriches events with IP addresses, its class name should be IPAddressEnricher.

All processors are then imported in the file of the pipeline, which utilizes them. Every pipeline begins with one or a list of source processors and ends with one sink processor (for more information see Pipeline section below).

The typical pipeline with all appropriate processors may look as follows:

class TCPPipeline(bspump.Pipeline):
	def __init__(self, app):
	super().__init__(app)
	self.build(
		bspump.socket.TCPStreamSource(app, self),
		IPAddressEnricher(app, self),
		bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
	)

Build of analyzers

Analyzer is a specific type of processor which behaves as an observer. Apart from the other processors, it does not change input events but executes analysis over them. Thus there can follow an output in the form of a concrete measurement or mathematical operation which processes defined event items – for instance it can add specific item values. For more information and guidance, see Analyzer section below.

The best practice is to initialize analyzers in the same way as processor, i. e. directly in the pipeline. The most important methods are the evaluate, which inserts events into the analyzer’s matrix (see Analyzer section below), and analyze which periodically or by an external event (sent through publish-subscribe mechanism) performs analysis and/or other types of computation and sends the result typically to the secondary pipeline, which is suitable for aggregated events.

Never use secondary pipeline for simple events (see below), but only for complex events that are produced as output of the analyze method.

Optimization

The first step in optimization of the application is not to use long-running synchronous processing code inside the pipeline and its processor. Instead, you can use the OOB processing, which would put the long-running processing of events outside of the synchronous pipeline - either to a separate thread or to an asynchronous processing. For guidance, see the following examples:

Other optimizations are based on measuring the basic pipeline metrics.

Stabilization

Every pipeline contains basic metrics that you can use to measure the speed of the pipeline:

  • number of processed events per second
  • duty cycle
  • time drift

Time Drift is not implicit metric of the pipeline, but it can be obtained via Time Drift Analyzer processor inserted to the pipeline (see section Time Drift Analyzer below). For more information about duty cycle, see the following description: https://en.wikipedia.org/wiki/Duty_cycle

All metrics can be seen in the log line or configured to be sent to external database, such as InfluxDB. Include the following lines in the configuration file:

[asab:metrics]
target=influxdb

[asab:metrics:influxdb]
url=<YOUR_INFLUXDB_URL>
db=<YOUR_DATABASE>
username=<YOUR_USERNAME>
password=<YOUR_PASSWORD>

To also enable logging to file, use the following configuration snippet:

[logging:file]
path=<YOUR_LOG_FILE>
backup_count=3
rotate_every=1d

Then you can connect your InfluxDB with Grafana and create graph, where you can see and measure the output of the metrics. You can then test which processors slow down the pipeline (by disabling them) and through duty cycle measure the performance and influence of external connected systems like Kafka or ElasticSearch.

PyPy

Every BitSwan application can be compiled via PyPy to boost the performance more than 5 times.

To download PyPy and use it instead of the default Python interpreter to compile you code, follow this link: https://pypy.org/

Identification and location of pipeline objects

Every top-level BitSwan object like pipeline, connection, lookup and matrix, as well as members of the pipeline have their IDs. The ID serves to identify the object within BSPump application, so it can be located and configured specifically.

The ID is implicitly constructed from the object’s class name, so that an instance of class MyPipeline will have MyPipeline ID. The ID can be overridden in the top (or member of the pipeline’s) object’s constructor MyPipeline(app, "MyPipelineID"), so that it is used for registration and configuration purposes instead of the class name.

The registration and acquirement of BSPump’s top objects is done using the BSPump service, which can be obtained inside the application object via app.get_service("bspump.PumpService") command. The service takes care of the life-cycle of the top objects as well as their execution (in case of pipelines).

svc.add_pipeline(MyPipeline(app, "MyPipelineID"))

The second parameter in the constructor of top objects is an ID that the service will use as a key to store the objects. The configuration will then use the ID as well instead of the class name (see Configuration section below).

Every object that has then access to the service or the application object itself can locate other registered objects. This is useful especially when some processor needs to locate connection, other processor inside a different pipeline or another pipeline itself.

processor = svc.locate("MyPipelineID.*InternalSource")

Configuration

Every BitSwan object inside the BSPump application (including the application object itself) can be configured using user-defined configuration options. Every application, pipeline, lookup, processor etc. can access the configuration options via Configurable object, stored in a private variable self.Config. It is basically a dictionary with all the defined options.

It is a good practice to have all possible configuration options specified in the code together with the default values by defining ConfigDefaults dictionary inside the specific class:

class MyServerConnection(Connection):

	ConfigDefaults = {
		'servers': 'localhost:8080',
		'output_queue_max_size': 100,
	}

Please note, that ConfigDefaults overrides ConfigDefaults from inherited classes, so the final set is a composition of ConfigDefaults entries. It means that all options from the parent class are preserved in its children, and those specified in the child classes override the options in the parent class.

Now it is ensured that all configuration options will always be present in the Configurable object, accessible via self.Config. The default values can be overridden either by external configuration files linked via -c switch, or using the config parameter in the object’s constructor. Values specified in the constructor will always be prioritized, hence the specification of a given configuration option in files will not be propagated to the object. The configuration file is based on Python’s native configuration file parser (for more information, see https://docs.python.org/3/library/configparser.html).

[connection:MyServerConnection]
servers=production:8080

The configuration sections specified in configuration files are supposed to contain prefix based on the BSPump’s top object’s nature - if it is registered inside the application as lookup, pipeline, connection etc. Processors and other objects that are part of the previously mentioned top objects are supposed to be prefixed by the top object ID. The ID for top objects as well as processors and other native BSPump objects can be specified in the constructor (see Identification and location of pipeline objects section above).

[pipeline:MySamplePipeline:MyProcessor]
process_events=50

The following table illustrates how the configuration section name looks for every top object and member of the pipeline:

Object type Section name
Connection [connection:<CONNECTION_ID>]
Lookup [lookup:<LOOKUP_ID>]
Pipeline [pipeline:<PIPELINE_ID>]
Processor [pipeline:<PIPELINE_ID>:<PROCESSOR_ID>]
Source [pipeline:<PIPELINE_ID>:*<SOURCE_ID>]
Sink [pipeline:<PIPELINE_ID>:<SINK_ID>]

For more information about how the configuration work, refer to the ASAB documentation: https://asab.readthedocs.io/en/latest/asab/config.html.

Application object

Application object constitutes a triggerable part of the project based on the BSPump. It contains a service based on an open-source ASAB library called PumpService which serves for pipeline object registration (administering data streams), connection and lookup objects.

Pipeline objects in BSPump

The following code illustratively shows the creation of an application object and addition of connection object for connection to ElasticSearch database and the actual pipeline object which can localize and use the connection given – for instance as a source of events or their subsequent storage.

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection"))
svc.add_pipeline(MyPipeline(app))
app.run()

Pipeline

In the BSPump data are processed in a so called pipeline. Individual pipeline objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers. Each pipeline is usually in charge of one concrete task.

Pipeline diagram

Each pipeline always contains three types of components:

  • Source
  • Processor
  • Sink

The pipeline can consist of more source objects, which means the possibility of connection to more data streams. It can also be made up of more processors, of course (e.g. several parsers and enrichers) and thus dispatch a larger number of elementary tasks. Each pipeline contains only one sink object where its output is stored.

The following code shows the definition of the actual pipeline object made up of one source, processor and sink.

class MyPipeline(bspump.Pipeline):
	def __init__(self, app):
	super().__init__(app)
	self.build(
		bspump.socket.TCPStreamSource(app, self),
		bspump.common.JSONParserProcessor(app, self),
		bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
	)

Primary pipeline

Primary pipeline is a pipeline registered within the application, that uses connections to external services to obtain and send data to and is thus not dependent on events produced by any other pipeline, which is part of the application.

The events typically processed by a primary pipeline are called basic events, since there is no aggregation done on them before entering the pipeline inside the BSPump application.

Secondary pipeline

Secondary pipeline is a pipeline registered within the application, which is dependent on events produced by some other (usually primary) pipeline. The dependency is usually manifested in secondary pipeline’s InternalSource, through which the primary pipeline inserts events.

The secondary pipeline expects events, that are less frequent than the ones being processed by the primary pipeline, which provides events for the secondary pipeline. Typical use case is the output of an analyzer (see below) being sent via the secondary pipeline.

The events typically processed by a secondary pipeline are called complex events, because they originate as aggregations by e.g. analyzers located in the primary pipeline. The complex events are inserted by the analyzer to the secondary pipeline to be subsequently processed. This mechanism is called Complex Event Processing: https://en.wikipedia.org/wiki/Complex_event_processing

Every pipeline provides a quick way how to obtain an important objects such as an application object in App attribute. Thus, every attribute and method that is available in the application, is also available in the pipeline. The following table lists important attributes for top BitSwan objects:

Application Pipeline Connection Source Processor / Sink Lookup Comment
  Id Id Id Id Id Identification of the object, specified in the constructor.
  App App App App App Application object.
Loop via App via App via App via App via App Application Loop, which can be used in native asyncio methods.
PubSub (app) PubSub (pipeline)         Application-level or Pipeline-level
  Sources         List of sources.
  Processors         List of processors.

PumpService can be obtained the same way as it is in the application object (svc = self.App.get_service("bspump.PumpService")). When a processor (usually a source or a sink) needs to locate a connection, lookup, matrix as well as any other top-level or pipeline’s object, it can utilize the pump service:

connection = svc.locate_connection("MyConnection")

For instance, MyConnection can be KafkaConnection in order to obtain a connection that would be able to consume or produce messages to Kafka from within the processor. There is also a built-in method locate_connection directly inside the pipeline object, which implements this behavior of location of PumpService and connection obtaining as well:

connection = pipeline.locate_connection(app, connection)

The PumpService can also be used to obtain processors from other pipelines. The most common case is to locate InternalSource of secondary pipeline where the primary pipeline can push events to:

source = svc.locate("MyPipelineID.*InternalSource")

For more information, see section Identification and location of pipeline objects above.

Source

Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.

Kafka Source

Streaming source vs. triggered source

All the sources are created as so called streaming, which means that events can enter them in real time as they are being delivered by the input technology. This for instance holds for messaging technologies of Kafka or RabbitMQ type.

As far as other sources like SQL databases or files are concerned, real-time streaming is not possible to be set. Such sources are called triggered sources. Triggered sources have to be triggered by an external event or for instance by a repeating timer – e.g. loading data from a database every 5 minutes or reading data from files on a shared disk where they are recorded from other systems. Triggered sources are not real-time intrinsically – however, they can be considered as near real-time sources depending on input data.

The BSPump for instance contains the following types of source objects:

  • RabbitMQ
  • HTTP client
  • HTTP server
  • Kafka
  • MySQL
  • TCP
  • File
  • CSV files
  • JSON files

Objects can easily be created and and thus adjusted to one´s own needs and data sources.

Processor

The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.

Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.

Procesory v pipeline

The BSPump is created in the form of a modular open system. Thus it is possible for a knowledgeable user to create and set his/her own processor all by himself/herself. Subsequently, he/she can easily focus on the creation of an algorithm (a computing method) inside the processor. Then the object is administered by internal BSPump project mechanisms and thus the user is screened out from them. Creating a simple processor usually takes a few minutes, which makes it possible for the user to focus on a concrete task within the project the processor is supposed to execute.

Parser

Parser is a type of processor which decomposes incoming data blocks into smaller logical entities - i.e. events containing key and value fields with the purpose of a more suitable processing and widening data structure in subsequent (as far as the pipeline is concerned) processors. The parser output is made up of structured data in the form of key and value fields (so called dictionaries) with some added value for the user.

Parser

Decomposing data into smaller blocks by means of pre-defined rules is executed for the purpose of their simpler interpretation, administration and easier processing by subsequent processors.

Examples of parsers:

  • JSON parser - transfers data from JSON format into events in dictionary format
  • AVRO parser - transfers data from AVRO format into specific events
  • XML parser - transfers data from XML format into specific events

Example of a parser from the telecommunication field:

  • XDR S1 - transfers data records with information from S1 mobile signal protocol into specific events

BitSwan openness provides users with the possibility of writing their own parsers to be able to add new data sources faster and thus gain an advantage ensuing from transferring data into structured forms.

Filter

Filter is a of processor that allows filtering of incoming events based on information associated with them. Filters may either drop events that do not match the filter conditions, or pass them to the same or different pipeline with flags, labels etc. Other processors in the pipelines should then behave accordingly to the flag specified in the event.

ContentFilter is part of the bspump.filter module and its purpose is to filter incoming events based on specified MongoDB-like query, specified in the query attribute in the constructor. For instance, the following query expects attributes user and num present in the incoming event, and filters users based on their ID (user_0, user_2, user_10) and numbers based on their size (the number should be greater than 500):

"$or":
[
	{
		"user":
		{
			'$in':["user_0", "user_2", "user_10"]
		}
	}, 
	{
		"num":
		{
			"$gte": 500
		}
	}
]

Specific implementation of the filter should override the generic ContentFilter class and its methods on_hit and on_miss, which the processed filtered events, that either matched (on_hit) or did not match (on_miss) the query. The behavior of ContentFilter is similar to the query feature in SQLstream. For implementation of these methods and sample Mongo query, see example on GitHub: https://github.com/LibertyAces/BitSwanPump/blob/master/bspump-content-filter.py

TimeDriftFilter, also part of the bspump.filter module, drops events, whose @timestamp (in seconds) attribute is older than current time plus configured max_time_drift (in seconds).

AttributeFilter in the bspump.filter module, filters events based on specific attribute present in the event (which is expected to be a Python dictionary).

Enricher

Enricher is a type of processor which searches information relating to a specific event item and subsequently adds the result to the actual event as a new additional item. In this way, the data structure is widened by other useful information. In case this information becomes known and common, it is called dimension. The enricher effectively increases the data value.

Enricher

Example of an enricher:

  • ● GEO IP enricher – adds information about locations to events according to accessible device IP addresses which based on the table containing all IP addresses and their GEO positions (countries, regions, cities)

The BitSwan openness provides users with the possibility of writing their own enrichers which can instantaneously be used for enriching data with new dimensions bringing some values (e.g. as regards map visualisations, other processing etc).

Generator

Generator is a type of processor which decomposes large data records into more events. It is mainly used for data records made up of a lot of various measurements or aggregated values. The input generator expects a large generic record and creates separated data events.

Example of a generator:

  • CSV generator - expects CSV data file at input where it creates a specific event for each line – thus columns become a key in the resulting value field
Transformer

Transformer is a processor which transforms one type of data record into some other one. This enables the user to unite data in a simple way. For more information see lookups (transformers typically make use of lookups managing the unification process).

Router

Router is an advanced type of processor which is necessary for dispatching events into other pipeline objects.

The router analyzes incoming events and is capable of executing one of the following operations over them:

(a) transfers them into other existing pipeline objects in dependence on some event value

(b) creates a new event and dispatches it into other existing pipeline objects

The routing process is based on internal routing logic of the BSPump project which enables the user to make his/her own decision as regards the possibility of cloning, decomposing and finishing events or creating new ones. The routing logic offers extended possibilities of creativity when working with events.

Router

A router-type processor is suitable at all locations where incoming data processing by some other pipeline object is considered necessary. In such cases routers are used for: (a) decomposing or cloning events into new pipelines, where they can be processed (b) creating new events (in dependence on input event values) and their dispatching into other pipelines with the purpose of subsequent processing. Then new or decomposed events enter other pipelines where their subsequent processing starts. One event can enter more internal pipeline objects.

Analyzer

Analyzer is a specific type of processor which behaves as an observer. Apart from the other processors, it does not change input events but executes analysis over them. Thus there can follow an output in the form of a concrete measurement or mathematical operation which processes defined event items – for instance it can add specific item values. There are analyzers operating on Matrix, the top-level object of bspump.

Analyzer

Then the analyzer dispatches the measurement result (e.g. into some other pipeline) in the form of a new event in dependence on a defined trigger (for instance once in 60 seconds).

Resulting events, which constitute the observation of the analyzer, can subsequently create an aggregated view of the data stream or trigger a more complex processing of it.

The nature of so called triggers for obtaining results from the analyzer can differ. It can be triggered by some other pipeline object, measured time or some other condition within the analyzer. For the time trigger two types of time are used: current time and time of the event being streamed (this information is usually contained in one of the event items).

Any Analyzer is a processor, it means, it has process() function. By default it is:

def process(self, context, event):
	if self.predicate(context, event):
		self.evaluate(context, event)
	return event

predicate() should filter incoming events and return boolean. By default always returns True. evaluate() is main function, where the information should be aggregated.

Example:

	def evaluate(self, context, event):
		row_index = self.TimeWindow.get_row_index(event['some_id'])
		if row_index is None:
			row_index = self.TimeWindow.add_row(event['some_id'])

		column_index = self.TimeWindow.get_column(event['@timestamp']) # not need if SessionMatrix
		if column_index is None:
			# do nothing
			return

		self.TimeWindow.Array[row_index, column_index] = event['some_attribute']
		# self.Sessions.Array['some_column'][row_index] = event['some_attribute']
		...

Each Analyzer has an analyze() function, which can be called internally or externally, depends on your needs. Typically analyze() produces events based on the object of analyzis (usually Matrix) to some external source. It is specific for each analyzer. If the analyzed object is Matrix, it is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy fuctions. Examples:

  1. You have a vector with n rows. You need only those row indeces, where the cell content is more than 10. Use np.where(vector > 10).
  2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero.
  3. Use np.mean(matrix, axis=1) to get means for all rows.
  4. Usefull numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().

The analyzer is a universal BitSwan component and brings a high value to users – they can fastly define new measurements or computing conditions while setting trigger conditions creatively.

For more information about different types of analyzers please read the section Data analysis.

Sink

Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.

ElasticSearch Sink

Examples of sink types:

  • Rabbit
  • Elasticsearch
  • HTTP server
  • InfluxDB
  • Kafka
  • MySQL
  • Apache Parquet
  • Slack
  • File
  • CSV files

Connection

Connection objects serve for making connection with some other item or service within the infrastructure. Subsequently, they are localized and used within individual pipelines, first of all in source, processor and sink objects by which they are used for e.g. obtaining or dispatching data to given service the connection object is related to.

As an example KafkaSource object can be used which expects KafkaConnection for the purpose of making connection with Kafka instance within a corporation where service location together with all login data needed are accessible from BSPump application configuration.

Lookup

Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.

Thus a dynamic lookup can be changed in the course of the application running - for instance by values from incoming events. The BSPump internal logic also makes it possible to synchronize lookups among individual pump instances - provided this possibility is specified in the configuration. In this case one pump instance can serve as a publisher (master) as regards the point given by lookups, while the other ones can be switched to the subscriber (slave) mode. Subsequently, the actual lists are stored into an internal storage and used when the application is triggered next time.

Lookups can be created from static files or directly from events being streamed. As an example a lookup of vehicles according to their ID´s can be used.

Matrix

Matrices are another category of BSPump top objects, next to pipelines, lookups and connections. Matrices are used to create a shared storage with defined rows and columns to be shared among other objects, such as analyzers and other types of processor.

The core of the matrix is NumPy multidimensional fixed-size array with preallocated memory. The content of cells in the matrix must be predefined either as integer, float or string. The matrix also contains additional structures such as RowMap, RevRowMap to translate numerical indices of the matrix into specific IDs, Storage to keep associated Python structures such as lists and dictionaries to store relevant information (f. e. such as personal information, if one element in the matrix is a person) for each element in the matrix, ClosedRows to remember rows to be eventually deleted.

The example usage of a matrix can be seen in the GeoAnalyzer, where it is used to store geographical elements such as locations of persons, vehicles etc. The matrix can be then analyzed and used by other processors within the BSPump application.

Inside the application, there are add_matrix and locate_matrix methods, that are similar to add_pipeline, add_lookup etc. methods, that are being used to store and handle other top objects.

For more information about different types of matrices please read the section Data analysis.

Event

Event comprises a data unit which is sent through a pipeline and its processors. The most frequent event form is structured in the form of a dictionary – i. e. fields with keys and values. In this form it can easily be recorded into ElasticSearch database for instance.

However, the nature of events can be arbitrary. In such a case it is necessary to ensure that the subsequent processor expects the same data format which produces the current one at the output – this holds true for each pipeline.

Messages

BSPump implicitly produces messages in an publish-subscribe (PubSub) object, that describe the state change of the pump and its pipelines and allow subscribed objects to react accordingly. The following table lists all important default messages published by BitSwan top objects. For more information about PubSub and how you can subscribe to messages, please refer to the ASAB documentation: https://asab.readthedocs.io/en/latest/asab/pubsub.html

The PubSub object may be created specifically for a pipeline object such as a specific connection, or the pipeline’s PubSub or application’s PubSub objects may be used by producer. The following table also lists PubSub objects messages are published to by certain producers, in the “PubSub object” column.

Message Produced by PubSub object Description
bspump.pipeline.start! Pipeline Pipeline.PubSub Published when the specific pipeline has started.
bspump.pipeline.clear_error! Pipeline Pipeline.PubSub Published when an error is cleared from the pipeline and it starts checking whether it can be restarted again.
bspump.pipeline.warning! Pipeline Pipeline.PubSub Published when a warning is set to the pipeline.
bspump.pipeline.error! Pipeline Pipeline.PubSub Published when an error is set to the pipeline and pipeline is about to be stopped.
bspump.pipeline.ready! Pipeline Pipeline.PubSub Published when the pipeline is ready to process events again.
bspump.pipeline.not_ready! Pipeline Pipeline.PubSub Published when the pipeline is not ready to process events (because of a throttle object etc.).
bspump.Lookup.changed! Lookup Lookup.PubSub Published when lookup was updated (data reloaded).
bspump.pipeline.cycle_begin! TriggerSource Pipeline.PubSub Published when the specific pipeline’s source is triggered to process events.
bspump.pipeline.cycle_canceled! TriggerSource Pipeline.PubSub Published when the specific pipeline’s source’s trigger processing is cancelled by the user.
bspump.pipeline.cycle_end! TriggerSource Pipeline.PubSub Published when the specific pipeline’s source’s trigger processing of events ended.
bspump.file_source.no_files! FileABCSource Pipeline.PubSub Published when file source has no more files to process.

The following messages are produced by ASAB and can also be subscribed to in BSPump applications:

Message Produced by PubSub object Description
Application.init! Application Application.PubSub This message is published when application is in the init-time. It is actually one of the last things done in init-time, so the application environment is almost ready for use. It means that configuration is loaded, logging is setup, the event loop is constructed etc.
Application.run! Application Application.PubSub This message is emitted when application enters the run-time.
Application.stop! Application Application.PubSub This message is emitted when application wants to stop the run-time. It can be sent multiple times because of a process of graceful run-time termination. The first argument of the message is a counter that increases with every Application.stop! event.
Application.exit! Application Application.PubSub This message is emitted when application enter the exit-time.
Application.hup! Application Application.PubSub This message is emitted when application receives UNIX signal SIGHUP or equivalent.
Application.tick! Application Application.PubSub The application publish periodically “tick” messages. The default tick frequency is 1 second but you can change it by configuration [general] tick_period. Application.tick! is published every tick. Application.tick/10! is published every 10th tick and so on.

The following messages are also available:

  • Application.tick/10! (once per ten seconds)
  • Application.tick/60! (once per one minute)
  • Application.tick/300!
  • Application.tick/600!
  • Application.tick/1800!
  • Application.tick/3600! (once per one hour)
  • Application.tick/43200!
  • Application.tick/86400!

Default API

Every BSPump application, that is run with -w, provides a default API with following endpoints, which serve to list pipelines, metrics, content of lookups etc.

The default port the API runs at is 80. The port can be reconfigured inside in the -w switch in format -w "IP_ADDRESS:PORT".

Endpoint Description
/pipelines Obtain list of pipelines registered in the application together with all processors.
/lookup/{lookup_id} Obtain content of a lookup specified by its id.
/metric Obtain list of all metrics registered in the application.
/metric/{metric_id} Obtain values and other details of a metric specified by its id.
/manifest Obtain version and other manifest information.

The documentation of the default BSPump API together with simple examples is available in PostMan: https://documenter.getpostman.com/view/3254207/SVSRJRxT?version=latest

ElasticSearch

ElasticSearch is a schema-less database and search engine which makes it possible to store and fastly obtain data and required aggregations at individual junctions. Data are kept in the form of JSON documents stored within so called indexes in which ElasticSearch enables their versing. The size of indexes is possible to be changed automatically for the purpose of reaching maximum scalability - in dependence on data size or a time window.

For data storage and making queries over them, ElasticSearch makes use of REST API. After simple configuration ElasticSearch can be run in a cluster where individual computing junctions determine a master junction all by themselves.

By means of connection object the BSPump enables connection with ElasticSearch instance which can mainly be used for storing of individual events.

For more information go to ElasticSearch product documentation website: https://www.elastic.co/guide/index.html

bselastic

BSElastic is a useful tool which is at disposal within the BSPump project in utils directory.

usage: bselastic [-h] {load_index_template,cleanup,reopen,close,delete} ...

This tool enables to perform the following operations:

  • load_index_template - loads index templates from a given directory into a specified ElasticSearch instance
  • cleanup - closes and cleans up empty indexes
  • reopen - reopens indexes within a specified time interval
  • close - closes indexes within a specified time interval
  • delete - deletes indexes within a specified time interval

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump/tree/master/utils

Kibana

Kibana is a monitoring and visualization tool running over data obtained from ElasticSearch database. It enables a simple creation of visualizations, dashboards and time graphs capable of displaying the development of more complex metrics and aggregations in time. Visualizations consist of tables, graphs, diagrams or maps creation and thus enable a quick insight into the nature of information contained in data.

For more information go to Kibana documentation website: https://www.elastic.co/guide/en/kibana/index.html

bskibana

BSKibana is a useful tool which is at disposal within the BSPump project in utils directory:

usage: bskibana [-h] [--verbose] [--silent] {export,import,decompile,compile,document}

This tool enables to perform the following operations:

  • export - exports Kibana index from a specified ElasticSearch instance into a file
  • import - imports Kibana index from a file into a specified ElasticSearch instance
  • decompile - decompiles a file with Kibana index into Kibana Object Library (see below for details)
  • compile - compiles Kibana Object Library into a file
  • document - creates documentation
  • rename - renames an ID index of patterns to referred prefixes of ElasticSearch indexes

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump/tree/master/utils

Kibana Object Library

Kibana Object Library is a directory structure with objects, which originated by splitting Kibana index into directories according to types of individual objects.

Examples of objects:

  • Kibana settings
  • index patterns
  • visualizations
  • dashboards
  • lookups (see below)

By means of BSKibana tool Kibana Object Library makes it possible to export and import the above mentioned objects into/out of files and verse these files subsequently. These files are administered by Git version system.

Kibana Lookup Plugin

Kibana Lookup Plugin enables Kibana to create value lookups for for specified columns within an index pattern. For instance for a column called ResponseCode it is possible to create a lookup which assorts verbal description (e.g. “OK”) to a given value – the description subsequently displays in Kibana together with the given value.

By means of BSPump the Kibana user can create a lookup and load it into ElasticSearch.