Software engineering

Introduction

BSPump

BSPump is an open-source project written in Python language, which can be used as a basis of software solution using real-time data analysis by every developer. It forms the basis of the BitSwan product which further expands the BSPump project for individual cases of usage with its own microservices, configurations and issues.

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump

Where to start

This repository can be cloned and used as a template for one´s own BSPump application: https://github.com/LibertyAces/BitSwanTelco-BlankApp

The following chapters serve as a guide to creation of one´s own application and describe essential objects, tools and libraries related to the BSPump.

Application object

Application object constitutes a triggerable part of the project based on the BSPump. It contains a service based on an open-source ASAB library called PumpService which serves for pipeline object registration (administering data streams), connection and lookup objects.

Pipeline objects in BSPump

The following code illustratively shows the creation of an application object and addition of connection object for connection to ElasticSearch database and the actual pipeline object which can localize and use the connection given – for instance as a source of events or their subsequent storage.

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection"))
svc.add_pipeline(MyPipeline(app))
app.run()

Pipeline

In the BSPump data are processed in a so called pipeline. Individual pipeline objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers. Each pipeline is usually in charge of one concrete task.

Pipeline diagram

Each pipeline always contains three types of components:

  • Source
  • Processor
  • Sink

The pipeline can consist of more source objects, which means the possibility of connection to more data streams. It can also be made up of more processors, of course (e.g. several parsers and enrichers) and thus dispatch a larger number of elementary tasks. Each pipeline contains only one sink object where its output is stored.

The following code shows the definition of the actual pipeline object made up of one source, processor and sink.

class MyPipeline(bspump.Pipeline):
	def __init__(self, app):
	super().__init__(app)
	self.build(
		bspump.socket.TCPStreamSource(app, self),
		bspump.common.JSONParserProcessor(app, self),
		bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
	)

Source

Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.

Kafka Source

Streaming source vs. triggered source

All the sources are created as so called streaming, which means that events can enter them in real time as they are being delivered by the input technology. This for instance holds for messaging technologies of Kafka or RabbitMQ type.

As far as other sources like SQL databases or files are concerned, real-time streaming is not possible to be set. Such sources are called triggered sources. Triggered sources have to be triggered by an external event or for instance by a repeating timer – e.g. loading data from a database every 5 minutes or reading data from files on a shared disk where they are recorded from other systems. Triggered sources are not real-time intrinsically – however, they can be considered as near real-time sources depending on input data.

The BSPump for instance contains the following types of source objects:

  • RabbitMQ
  • HTTP client
  • HTTP server
  • Kafka
  • MySQL
  • TCP
  • File
  • CSV files
  • JSON files

Objects can easily be created and and thus adjusted to one´s own needs and data sources.

Processor

The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.

Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.

Procesory v pipeline

The BSPump is created in the form of a modular open system. Thus it is possible for a knowledgeable user to create and set his/her own processor all by himself/herself. Subsequently, he/she can easily focus on the creation of an algorithm (a computing method) inside the processor. Then the object is administered by internal BSPump project mechanisms and thus the user is screened out from them. Creating a simple processor usually takes a few minutes, which makes it possible for the user to focus on a concrete task within the project the processor is supposed to execute.

Parser

Parser is a type of processor which decomposes incoming data blocks into smaller logical entities - i.e. events containing key and value fields with the purpose of a more suitable processing and widening data structure in subsequent (as far as the pipeline is concerned) processors. The parser output is made up of structured data in the form of key and value fields (so called dictionaries) with some added value for the user.

Parser

Decomposing data into smaller blocks by means of pre-defined rules is executed for the purpose of their simpler interpretation, administration and easier processing by subsequent processors.

Examples of parsers:

  • JSON parser - transfers data from JSON format into events in dictionary format
  • ABRO parser - transfers data from ABRO format into specific events
  • XML parser - transfers data from XML format into specific events

Example of a parser from the telecommunication field:

  • XDR S1 - transfers data records with information from S1 mobile signal protocol into specific events

BitSwan openness provides users with the possibility of writing their own parsers to be able to add new data sources faster and thus gain an advantage ensuing from transferring data into structured forms.

Enricher

Enricher is a type of processor which searches information relating to a specific event item and subsequently adds the result to the actual event as a new additional item. In this way, the data structure is widened by other useful information. In case this information becomes known and common, it is called dimension. The enricher effectively increases the data value.

Enricher

Example of an enricher:

  • ● GEO IP enricher – adds information about locations to events according to accessible device IP addresses which based on the table containing all IP addresses and their GEO positions (countries, regions, cities)

The BitSwan openness provides users with the possibility of writing their own enrichers which can instantaneously be used for enriching data with new dimensions bringing some values (e.g. as regards map visualisations, other processing etc).

Generator

Generator is a type of processor which decomposes large data records into more events. It is mainly used for data records made up of a lot of various measurements or aggregated values. The input generator expects a large generic record and creates separated data events.

Example of a generator:

  • CSV generator - expects CSV data file at input where it creates a specific event for each line – thus columns become a key in the resulting value field
Transformer

Transformer is a processor which transforms one type of data record into some other one. This enables the user to unite data in a simple way. For more information see lookups (transformers typically make use of lookups managing the unification process).

Router

Router is an advanced type of processor which is necessary for dispatching events into other pipeline objects.

The router analyzes incoming events and is capable of executing one of the following operations over them:

(a) transfers them into other existing pipeline objects in dependence on some event value

(b) creates a new event and dispatches it into other existing pipeline objects

The routing process is based on internal routing logic of the BSPump project which enables the user to make his/her own decision as regards the possibility of cloning, decomposing and finishing events or creating new ones. The routing logic offers extended possibilities of creativity when working with events.

Router

A router-type processor is suitable at all locations where incoming data processing by some other pipeline object is considered necessary. In such cases routers are used for: (a) decomposing or cloning events into new pipelines, where they can be processed (b) creating new events (in dependence on input event values) and their dispatching into other pipelines with the purpose of subsequent processing. Then new or decomposed events enter other pipelines where their subsequent processing starts. One event can enter more internal pipeline objects.

Analyzer

Analyzer

Analyzer is a specific type of processor which behaves as an observer. Apart from the other processors, it does not change input events but executes analysis over them. Thus there can follow an output in the form of a concrete measurement or mathematical operation which processes defined event items – for instance it can add specific item values.

Analyzer

Then the analyzer dispatches the measurement result (e.g. into some other pipeline) in the form of a new event in dependence on a defined trigger (for instance once in 60 seconds).

Resulting events, which constitute the observation of the analyzer, can subsequently create an aggregated view of the data stream or trigger a more complex processing of it.

The nature of so called triggers for obtaining results from the analyzer can differ. It can be triggered by some other pipeline object, measured time or some other condition within the analyzer. For the time trigger two types of time are used: current time and time of the event being streamed (this information is usually contained in one of the event items).

The analyzer is a universal BitSwan component and brings a high value to users – they can fastly define new measurements or computing conditions while setting trigger conditions creatively.

Time Drift Analyzer

Time Drift Analyzer is a specific type of analyzer focused on an analysis of differences in time of event occurrence in stream in relation to current time. This analyzer describes system behaviour in case real-time data stream is delayed, which can serve as a system fault identification or just reference of system behaviour which has to be taken into consideration in the course of subsequent event processing.

Time Window Analyzer

Time Window Analyzer is a specific type of analyzer which evaluates incoming events in the context of the time window and dimensions being observed and executes mathematical operation over them which is given by the input function.

It captures events and arranges them according to time (current time or time of event) and determined dimension. It executes various mathematical operations over events in a defined time window with their fields like addition of values (number of values), searching minimum (maximum) values and the like.

Over the value matrix prepared in this way it subsequently executes other mathematical operations which constitute the input into the analyzer - for instance searching dimensions differing from the other ones (outliers), grouping dimensions according to values (classification), searching time intervals with empty values (drop to zero) or specific values (triggers) and the like.

The output of the Time Window Analyzer is a new event resulting from the mathematical operation applied to values captured in the time window given.

For more information see section Data analysis

Session Window Analyzer

Session Window Analyzer is a specific type of analyzer which evaluates incoming events in the context of a specific value (e.g. Session ID) and executes pre-defined operations over them. As regards time, the session can be limited.

For more information see section Data analysis

Sink

Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.

ElasticSearch Sink

Examples of sink types:

  • Rabbit
  • Elasticsearch
  • HTTP server
  • InfluxDB
  • Kafka
  • MySQL
  • Apache Parquet
  • Slack
  • File
  • CSV files

Connection

Connection objects serve for making connection with some other item or service within the infrastructure. Subsequently, they are localized and used within individual pipelines, first of all in source, processor and sink objects by which they are used for e.g. obtaining or dispatching data to given service the connection object is related to.

As an example KafkaSource object can be used which expects KafkaConnection for the purpose of making connection with Kafka instance within a corporation where service location together with all login data needed are accessible from BSPump application configuration.

Lookup

Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.

Thus a dynamic lookup can be changed in the course of the application running - for instance by values from incoming events. The BSPump internal logic also makes it possible to synchronize lookups among individual pump instances - provided this possibility is specified in the configuration. In this case one pump instance can serve as a publisher (master) as regards the point given by lookups, while the other ones can be switched to the subscriber (slave) mode. Subsequently, the actual lists are stored into an internal storage and used when the application is triggered next time.

Lookups can be created from static files or directly from events being streamed. As an example a lookup of vehicles according to their ID´s can be used.

Event

Event comprises a data unit which is sent through a pipeline and its processors. The most frequent event form is structured in the form of a dictionary – i. e. fields with keys and values. In this form it can easily be recorded into ElasticSearch database for instance.

However, the nature of events can be arbitrary. In such a case it is necessary to ensure that the subsequent processor expects the same data format which produces the current one at the output – this holds true for each pipeline.

ElasticSearch

ElasticSearch is a schema-less database and search engine which makes it possible to store and fastly obtain data and required aggregations at individual junctions. Data are kept in the form of JSON documents stored within so called indexes in which ElasticSearch enables their versing. The size of indexes is possible to be changed automatically for the purpose of reaching maximum scalability - in dependence on data size or a time window.

For data storage and making queries over them, ElasticSearch makes use of REST API. After simple configuration ElasticSearch can be run in a cluster where individual computing junctions determine a master junction all by themselves.

By means of connection object the BSPump enables connection with ElasticSearch instance which can mainly be used for storing of individual events.

For more information go to ElasticSearch product documentation website: https://www.elastic.co/guide/index.html

bselastic

BSElastic is a useful tool which is at disposal within the BSPump project in utils directory.

usage: bselastic [-h] {load_index_template,cleanup,reopen,close,delete} ...

This tool enables to perform the following operations:

  • load_index_template - loads index templates from a given directory into a specified ElasticSearch instance
  • cleanup - closes and cleans up empty indexes
  • reopen - reopens indexes within a specified time interval
  • close - closes indexes within a specified time interval
  • delete - deletes indexes within a specified time interval

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump/tree/master/utils

Kibana

Kibana is a monitoring and visualization tool running over data obtained from ElasticSearch database. It enables a simple creation of visualizations, dashboards and time graphs capable of displaying the development of more complex metrics and aggregations in time. Visualizations consist of tables, graphs, diagrams or maps creation and thus enable a quick insight into the nature of information contained in data.

For more information go to Kibana documentation website: https://www.elastic.co/guide/en/kibana/index.html

bskibana

BSKibana is a useful tool which is at disposal within the BSPump project in utils directory:

usage: bskibana [-h] [--verbose] [--silent] {export,import,decompile,compile,document}

This tool enables to perform the following operations:

  • export - exports Kibana index from a specified ElasticSearch instance into a file
  • import - imports Kibana index from a file into a specified ElasticSearch instance
  • decompile - decompiles a file with Kibana index into Kibana Object Library (see below for details)
  • compile - compiles Kibana Object Library into a file
  • document - creates documentation
  • rename - renames an ID index of patterns to referred prefixes of ElasticSearch indexes

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump/tree/master/utils

Kibana Object Library

Kibana Object Library is a directory structure with objects, which originated by splitting Kibana index into directories according to types of individual objects.

Examples of objects:

  • Kibana settings
  • index patterns
  • visualizations
  • dashboards
  • lookups (see below)

By means of BSKibana tool Kibana Object Library makes it possible to export and import the above mentioned objects into/out of files and verse these files subsequently. These files are administered by Git version system.

Kibana Lookup Plugin

Kibana Lookup Plugin enables Kibana to create value lookups for for specified columns within an index pattern. For instance for a column called ResponseCode it is possible to create a lookup which assorts verbal description (e.g. “OK”) to a given value – the description subsequently displays in Kibana together with the given value.

By means of BSPump the Kibana user can create a lookup and load it into ElasticSearch.