Software engineering

Introduction

BSPump

BSPump is an open-source project written in Python language, which can be used as a basis of software solution using real-time data analysis by every developer. It forms the basis of the BitSwan product which further expands the BSPump project for individual cases of usage with its own microservices, configurations and issues.

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump

Where to start

This repository can be cloned and used as a template for one´s own BSPump application: https://github.com/LibertyAces/BitSwanTelco-BlankApp

The following chapters serve as a guide to creation of one´s own application and describe essential objects, tools and libraries related to the BSPump.

Identification and location of pipeline objects

Every top-level BitSwan object like pipeline, connection, lookup and matrix, as well as members of the pipeline have their IDs. The ID serves to identify the object within BSPump application, so it can be located and configured specifically.

The ID is implicitly constructed from the object’s class name, so that an instance of class MyPipeline will have MyPipeline ID. The ID can be overridden in the top (or member of the pipeline’s) object’s constructor MyPipeline(app, "MyPipelineID"), so that it is used for registration and configuration purposes instead of the class name.

The registration and acquirement of BSPump’s top objects is done using the BSPump service, which can be obtained inside the application object via app.get_service("bspump.PumpService") command. The service takes care of the life-cycle of the top objects as well as their execution (in case of pipelines).

svc.add_pipeline(MyPipeline(app, "MyPipelineID"))

The second parameter in the constructor of top objects is an ID that the service will use as a key to store the objects. The configuration will then use the ID as well instead of the class name (see “Configuration” section below).

Every object that has then access to the service or the application object itself can locate other registered objects. This is useful especially when some processor needs to locate connection, other processor inside a different pipeline or another pipeline itself.

processor = svc.locate("MyPipelineID.*InternalSource")

Configuration

Every BitSwan object inside the BSPump application (including the application object itself) can be configured using user-defined configuration options. Every application, pipeline, lookup, processor etc. can access the configuration options via Configurable object, stored in a private variable self.Config. It is basically a dictionary with all the defined options.

It is a good practice to have all possible configuration options specified in the code together with the default values by defining ConfigDefaults dictionary inside the specific class:

class MyServerConnection(Connection):

	ConfigDefaults = {
		'servers': 'localhost:8080',
		'output_queue_max_size': 100,
	}

Please note, that ConfigDefaults overrides ConfigDefaults from inherited classes, so the final set is a composition of ConfigDefaults entries. It means that all options from the parent class are preserved in its children, and those specified in the child classes override the options in the parent class.

Now it is ensured that all configuration options will always be present in the Configurable object, accessible via self.Config. The default values can be overridden either by external configuration files linked via -c switch, or using the config parameter in the object’s constructor. Values specified in the constructor will always be prioritized, hence the specification of a given configuration option in files will not be propagated to the object. The configuration file is based on Python’s native configuration file parser (for more information, see https://docs.python.org/3/library/configparser.html).

[connection:MyServerConnection]
servers=production:8080

The configuration sections specified in configuration files are supposed to contain prefix based on the BSPump’s top object’s nature - if it is registered inside the application as lookup, pipeline, connection etc. Processors and other objects that are part of the previously mentioned top objects are supposed to be prefixed by the top object ID. The ID for top objects as well as processors and other native BSPump objects can be specified in the constructor (see “Identification and location of pipeline objects” section above).

[pipeline:MySamplePipeline:MyProcessor]
process_events=50

The following table illustrates how the configuration section name looks for every top object and member of the pipeline:

Object type Section name
Connection [connection:<CONNECTION_ID>]
Lookup [lookup:<LOOKUP_ID>]
Pipeline [pipeline:<PIPELINE_ID>]
Processor [pipeline:<PIPELINE_ID>:<PROCESSOR_ID>]
Source [pipeline:<PIPELINE_ID>:*<SOURCE_ID>]
Sink [pipeline:<PIPELINE_ID>:<SINK_ID>]

For more information about how the configuration work, refer to the ASAB documentation: https://asab.readthedocs.io/en/latest/asab/config.html.

Application object

Application object constitutes a triggerable part of the project based on the BSPump. It contains a service based on an open-source ASAB library called PumpService which serves for pipeline object registration (administering data streams), connection and lookup objects.

Pipeline objects in BSPump

The following code illustratively shows the creation of an application object and addition of connection object for connection to ElasticSearch database and the actual pipeline object which can localize and use the connection given – for instance as a source of events or their subsequent storage.

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection"))
svc.add_pipeline(MyPipeline(app))
app.run()

Pipeline

In the BSPump data are processed in a so called pipeline. Individual pipeline objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers. Each pipeline is usually in charge of one concrete task.

Pipeline diagram

Each pipeline always contains three types of components:

  • Source
  • Processor
  • Sink

The pipeline can consist of more source objects, which means the possibility of connection to more data streams. It can also be made up of more processors, of course (e.g. several parsers and enrichers) and thus dispatch a larger number of elementary tasks. Each pipeline contains only one sink object where its output is stored.

The following code shows the definition of the actual pipeline object made up of one source, processor and sink.

class MyPipeline(bspump.Pipeline):
	def __init__(self, app):
	super().__init__(app)
	self.build(
		bspump.socket.TCPStreamSource(app, self),
		bspump.common.JSONParserProcessor(app, self),
		bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
	)

Every pipeline provides a quick way how to obtain an important objects such as an application object in App attribute. Thus, every attribute and method that is available in the application, is also available in the pipeline. The following table lists important attributes for top BitSwan objects:

Application Pipeline Connection Source Processor / Sink Lookup Comment
  Id Id Id Id Id Identification of the object, specified in the constructor.
  App App App App App Application object.
Loop via App via App via App via App via App Application Loop, which can be used in native asyncio methods.
PubSub (app) PubSub (pipeline)         Application-level or Pipeline-level
  Sources         List of sources.
  Processors         List of processors.

PumpService can be obtained the same way as it is in the application object (svc = self.App.get_service("bspump.PumpService")). When a processor (usually a source or a sink) needs to locate a connection, lookup, matrix as well as any other top-level or pipeline’s object, it can utilize the pump service:

connection = svc.locate_connection("MyConnection")

For instance, MyConnection can be KafkaConnection in order to obtain a connection that would be able to consume or produce messages to Kafka from within the processor. There is also a built-in method locate_connection directly inside the pipeline object, which implements this behavior of location of PumpService and connection obtaining as well:

connection = pipeline.locate_connection(app, connection)

The PumpService can also be used to obtain processors from other pipelines. The most common case is to locate InternalSource of secondary pipeline where the primary pipeline can push events to:

source = svc.locate("MyPipelineID.*InternalSource")

For more information, see section “Identification and location of pipeline objects” above.

Source

Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.

Kafka Source

Streaming source vs. triggered source

All the sources are created as so called streaming, which means that events can enter them in real time as they are being delivered by the input technology. This for instance holds for messaging technologies of Kafka or RabbitMQ type.

As far as other sources like SQL databases or files are concerned, real-time streaming is not possible to be set. Such sources are called triggered sources. Triggered sources have to be triggered by an external event or for instance by a repeating timer – e.g. loading data from a database every 5 minutes or reading data from files on a shared disk where they are recorded from other systems. Triggered sources are not real-time intrinsically – however, they can be considered as near real-time sources depending on input data.

The BSPump for instance contains the following types of source objects:

  • RabbitMQ
  • HTTP client
  • HTTP server
  • Kafka
  • MySQL
  • TCP
  • File
  • CSV files
  • JSON files

Objects can easily be created and and thus adjusted to one´s own needs and data sources.

Processor

The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.

Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.

Procesory v pipeline

The BSPump is created in the form of a modular open system. Thus it is possible for a knowledgeable user to create and set his/her own processor all by himself/herself. Subsequently, he/she can easily focus on the creation of an algorithm (a computing method) inside the processor. Then the object is administered by internal BSPump project mechanisms and thus the user is screened out from them. Creating a simple processor usually takes a few minutes, which makes it possible for the user to focus on a concrete task within the project the processor is supposed to execute.

Parser

Parser is a type of processor which decomposes incoming data blocks into smaller logical entities - i.e. events containing key and value fields with the purpose of a more suitable processing and widening data structure in subsequent (as far as the pipeline is concerned) processors. The parser output is made up of structured data in the form of key and value fields (so called dictionaries) with some added value for the user.

Parser

Decomposing data into smaller blocks by means of pre-defined rules is executed for the purpose of their simpler interpretation, administration and easier processing by subsequent processors.

Examples of parsers:

  • JSON parser - transfers data from JSON format into events in dictionary format
  • ABRO parser - transfers data from ABRO format into specific events
  • XML parser - transfers data from XML format into specific events

Example of a parser from the telecommunication field:

  • XDR S1 - transfers data records with information from S1 mobile signal protocol into specific events

BitSwan openness provides users with the possibility of writing their own parsers to be able to add new data sources faster and thus gain an advantage ensuing from transferring data into structured forms.

Filter

Filter is a of processor that allows filtering of incoming events based on information associated with them. Filters may either drop events that do not match the filter conditions, or pass them to the same or different pipeline with flags, labels etc. Other processors in the pipelines should then behave accordingly to the flag specified in the event.

ContentFilter is part of the bspump.filter module and its purpose is to filter incoming events based on specified MongoDB-like query, specified in the query attribute in the constructor. For instance, the following query expects attributes user and num present in the incoming event, and filters users based on their ID (user_0, user_2, user_10) and numbers based on their size (the number should be greater than 500):

"$or":
[
	{
		"user":
		{
			'$in':["user_0", "user_2", "user_10"]
		}
	}, 
	{
		"num":
		{
			"$gte": 500
		}
	}
]

Specific implementation of the filter should override the generic ContentFilter class and its methods on_hit and on_miss, which the processed filtered events, that either matched (on_hit) or did not match (on_miss) the query. The behavior of ContentFilter is similar to the query feature in SQLstream. For implementation of these methods and sample Mongo query, see example on GitHub: https://github.com/LibertyAces/BitSwanPump/blob/master/bspump-content-filter.py

TimeDriftFilter, also part of the bspump.filter module, drops events, whose @timestamp (in seconds) attribute is older than current time plus configured max_time_drift (in seconds).

AttributeFilter in the bspump.filter module, filters events based on specific attribute present in the event (which is expected to be a Python dictionary).

Enricher

Enricher is a type of processor which searches information relating to a specific event item and subsequently adds the result to the actual event as a new additional item. In this way, the data structure is widened by other useful information. In case this information becomes known and common, it is called dimension. The enricher effectively increases the data value.

Enricher

Example of an enricher:

  • ● GEO IP enricher – adds information about locations to events according to accessible device IP addresses which based on the table containing all IP addresses and their GEO positions (countries, regions, cities)

The BitSwan openness provides users with the possibility of writing their own enrichers which can instantaneously be used for enriching data with new dimensions bringing some values (e.g. as regards map visualisations, other processing etc).

Generator

Generator is a type of processor which decomposes large data records into more events. It is mainly used for data records made up of a lot of various measurements or aggregated values. The input generator expects a large generic record and creates separated data events.

Example of a generator:

  • CSV generator - expects CSV data file at input where it creates a specific event for each line – thus columns become a key in the resulting value field
Transformer

Transformer is a processor which transforms one type of data record into some other one. This enables the user to unite data in a simple way. For more information see lookups (transformers typically make use of lookups managing the unification process).

Router

Router is an advanced type of processor which is necessary for dispatching events into other pipeline objects.

The router analyzes incoming events and is capable of executing one of the following operations over them:

(a) transfers them into other existing pipeline objects in dependence on some event value

(b) creates a new event and dispatches it into other existing pipeline objects

The routing process is based on internal routing logic of the BSPump project which enables the user to make his/her own decision as regards the possibility of cloning, decomposing and finishing events or creating new ones. The routing logic offers extended possibilities of creativity when working with events.

Router

A router-type processor is suitable at all locations where incoming data processing by some other pipeline object is considered necessary. In such cases routers are used for: (a) decomposing or cloning events into new pipelines, where they can be processed (b) creating new events (in dependence on input event values) and their dispatching into other pipelines with the purpose of subsequent processing. Then new or decomposed events enter other pipelines where their subsequent processing starts. One event can enter more internal pipeline objects.

Analyzer

Analyzer

Analyzer is a specific type of processor which behaves as an observer. Apart from the other processors, it does not change input events but executes analysis over them. Thus there can follow an output in the form of a concrete measurement or mathematical operation which processes defined event items – for instance it can add specific item values.

Analyzer

Then the analyzer dispatches the measurement result (e.g. into some other pipeline) in the form of a new event in dependence on a defined trigger (for instance once in 60 seconds).

Resulting events, which constitute the observation of the analyzer, can subsequently create an aggregated view of the data stream or trigger a more complex processing of it.

The nature of so called triggers for obtaining results from the analyzer can differ. It can be triggered by some other pipeline object, measured time or some other condition within the analyzer. For the time trigger two types of time are used: current time and time of the event being streamed (this information is usually contained in one of the event items).

The analyzer is a universal BitSwan component and brings a high value to users – they can fastly define new measurements or computing conditions while setting trigger conditions creatively.

Time Drift Analyzer

Time Drift Analyzer is a specific type of analyzer focused on an analysis of differences in time of event occurrence in stream in relation to current time. This analyzer describes system behaviour in case real-time data stream is delayed, which can serve as a system fault identification or just reference of system behaviour which has to be taken into consideration in the course of subsequent event processing.

Time Window Analyzer

Time Window Analyzer is a specific type of analyzer which evaluates incoming events in the context of the time window and dimensions being observed and executes mathematical operation over them which is given by the input function.

It captures events and arranges them according to time (current time or time of event) and determined dimension. It executes various mathematical operations over events in a defined time window with their fields like addition of values (number of values), searching minimum (maximum) values and the like.

Over the value matrix prepared in this way it subsequently executes other mathematical operations which constitute the input into the analyzer - for instance searching dimensions differing from the other ones (outliers), grouping dimensions according to values (classification), searching time intervals with empty values (drop to zero) or specific values (triggers) and the like.

The output of the Time Window Analyzer is a new event resulting from the mathematical operation applied to values captured in the time window given.

For more information see section Data analysis

Session Window Analyzer

Session Window Analyzer is a specific type of analyzer which evaluates incoming events in the context of a specific value (e.g. Session ID) and executes pre-defined operations over them. As regards time, the session can be limited.

For more information see section Data analysis

Sink

Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.

ElasticSearch Sink

Examples of sink types:

  • Rabbit
  • Elasticsearch
  • HTTP server
  • InfluxDB
  • Kafka
  • MySQL
  • Apache Parquet
  • Slack
  • File
  • CSV files

Connection

Connection objects serve for making connection with some other item or service within the infrastructure. Subsequently, they are localized and used within individual pipelines, first of all in source, processor and sink objects by which they are used for e.g. obtaining or dispatching data to given service the connection object is related to.

As an example KafkaSource object can be used which expects KafkaConnection for the purpose of making connection with Kafka instance within a corporation where service location together with all login data needed are accessible from BSPump application configuration.

Lookup

Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.

Thus a dynamic lookup can be changed in the course of the application running - for instance by values from incoming events. The BSPump internal logic also makes it possible to synchronize lookups among individual pump instances - provided this possibility is specified in the configuration. In this case one pump instance can serve as a publisher (master) as regards the point given by lookups, while the other ones can be switched to the subscriber (slave) mode. Subsequently, the actual lists are stored into an internal storage and used when the application is triggered next time.

Lookups can be created from static files or directly from events being streamed. As an example a lookup of vehicles according to their ID´s can be used.

Matrix

Matrices are another category of BSPump top objects, next to pipelines, lookups and connections. Matrices are used to create a shared storage with defined rows and columns to be shared among other objects, such as analyzers and other types of processor.

The core of the matrix is NumPy multidimensional fixed-size array with preallocated memory. The content of cells in the matrix must be predefined either as integer, float or string. The matrix also contains additional structures such as RowMap, RevRowMap to translate numerical indices of the matrix into specific IDs, Storage to keep associated Python structures such as lists and dictionaries to store relevant information (f. e. such as personal information, if one element in the matrix is a person) for each element in the matrix, ClosedRows to remember rows to be eventually deleted.

The example usage of a matrix can be seen in the GeoAnalyzer, where it is used to store geographical elements such as locations of persons, vehicles etc. The matrix can be then analyzed and used by other processors within the BSPump application.

Inside the application, there are add_matrix and locate_matrix methods, that are similar to add_pipeline, add_lookup etc. methods, that are being used to store and handle other top objects.

Event

Event comprises a data unit which is sent through a pipeline and its processors. The most frequent event form is structured in the form of a dictionary – i. e. fields with keys and values. In this form it can easily be recorded into ElasticSearch database for instance.

However, the nature of events can be arbitrary. In such a case it is necessary to ensure that the subsequent processor expects the same data format which produces the current one at the output – this holds true for each pipeline.

ElasticSearch

ElasticSearch is a schema-less database and search engine which makes it possible to store and fastly obtain data and required aggregations at individual junctions. Data are kept in the form of JSON documents stored within so called indexes in which ElasticSearch enables their versing. The size of indexes is possible to be changed automatically for the purpose of reaching maximum scalability - in dependence on data size or a time window.

For data storage and making queries over them, ElasticSearch makes use of REST API. After simple configuration ElasticSearch can be run in a cluster where individual computing junctions determine a master junction all by themselves.

By means of connection object the BSPump enables connection with ElasticSearch instance which can mainly be used for storing of individual events.

For more information go to ElasticSearch product documentation website: https://www.elastic.co/guide/index.html

bselastic

BSElastic is a useful tool which is at disposal within the BSPump project in utils directory.

usage: bselastic [-h] {load_index_template,cleanup,reopen,close,delete} ...

This tool enables to perform the following operations:

  • load_index_template - loads index templates from a given directory into a specified ElasticSearch instance
  • cleanup - closes and cleans up empty indexes
  • reopen - reopens indexes within a specified time interval
  • close - closes indexes within a specified time interval
  • delete - deletes indexes within a specified time interval

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump/tree/master/utils

Kibana

Kibana is a monitoring and visualization tool running over data obtained from ElasticSearch database. It enables a simple creation of visualizations, dashboards and time graphs capable of displaying the development of more complex metrics and aggregations in time. Visualizations consist of tables, graphs, diagrams or maps creation and thus enable a quick insight into the nature of information contained in data.

For more information go to Kibana documentation website: https://www.elastic.co/guide/en/kibana/index.html

bskibana

BSKibana is a useful tool which is at disposal within the BSPump project in utils directory:

usage: bskibana [-h] [--verbose] [--silent] {export,import,decompile,compile,document}

This tool enables to perform the following operations:

  • export - exports Kibana index from a specified ElasticSearch instance into a file
  • import - imports Kibana index from a file into a specified ElasticSearch instance
  • decompile - decompiles a file with Kibana index into Kibana Object Library (see below for details)
  • compile - compiles Kibana Object Library into a file
  • document - creates documentation
  • rename - renames an ID index of patterns to referred prefixes of ElasticSearch indexes

For more information go to GitHub repository: https://github.com/LibertyAces/BitSwanPump/tree/master/utils

Kibana Object Library

Kibana Object Library is a directory structure with objects, which originated by splitting Kibana index into directories according to types of individual objects.

Examples of objects:

  • Kibana settings
  • index patterns
  • visualizations
  • dashboards
  • lookups (see below)

By means of BSKibana tool Kibana Object Library makes it possible to export and import the above mentioned objects into/out of files and verse these files subsequently. These files are administered by Git version system.

Kibana Lookup Plugin

Kibana Lookup Plugin enables Kibana to create value lookups for for specified columns within an index pattern. For instance for a column called ResponseCode it is possible to create a lookup which assorts verbal description (e.g. “OK”) to a given value – the description subsequently displays in Kibana together with the given value.

By means of BSPump the Kibana user can create a lookup and load it into ElasticSearch.