Data analytics in the BitSwan product
Stream analytics
BitSwan is designed for a fast and precise real-time data analysis, for the purpose of which it uses connection to data stream from various sources, whether it be Kafka, RabbitMQ, Syslog or increasing number of files in data storage.
Apart from the possibility of defining one´s own pipeline processors (e.g. designed for data processing, parsing and enriching of data) and metrics, the BSPump project provides already pre-defined possibilities of real-time analysis, which can be used in a concrete solution within the BitSwan product.
Stream analytics output can for instance be used for notification in case of exceeding set levels. This border value (outlier) exceeding - as part of anomaly detection – enables to search for individual metrics which differ from common metric behaviors in grouped data given. It poses an elegant mechanism which makes it possible to detect problems in the system automatically. Anomaly detection outputs are stored in ElasticSearch database within so called alarm indexes, where current and historical detections can be gone through and filtered as well.
Dimensions
Dimension is known for information connecting individual data events. It serves for correct classification and description of data so that they make sense as far as following analyses, visualisations and descriptions are concerned. There can for instance be time, geographical or technological dimensions.
An example of time dimension is for instance time of event occurrence in the system carried by the actual event or the length of phenomenon duration by which the event was created. As geographical dimensions can for instance serve: geographical coordinates searched from IP addresses, countries of origin etc. An example of a technological dimension can for instance be type of device on which the actual event originated.
Dimensions are used for correlation of individual events in data matrices – e.g. a geographical dimensions enable to group events according to concrete cities, regions and countries. By means of session dimensions individual events can be assorted to concrete sessions and behaviours of given users can be tracked.
Dimensions should be as general as possible and can contain more values in a list. For example, a dimension named “ip” can contain both originating and destination IP addresses. The metrics can be then calculated on both or one of the values (see Metrics below).
When compared to relation database world, dimensions can be seen as indexed attributes, by which searches, grouping and other operations happen. Dimensions are shared among different data sources and data sets (tables).
Metrics and correlations
Metrics are measured values indexed by a certain dimension or dimensions.
For instance, “number of unsuccessfully logged in users on a given server” is a measured value, a metric. Dimensions here are the “user” and the “server”, by which the calculation of metrics or detection of symptoms may happen.
When a metric exceeds some threshold, a symptom may be produced. Metrics along with symptoms are produced in in-memory analyzers and passed to AnomalyAnalyzer. One symptom can be built on top of more metrics based on different dimensions and thus create correlations or complex symptoms.
For instance, when “number of unsuccessfully logged in users” exceeds 10 during the last 1 minute, a symptom with type “user_group_login_fail” is produced and passed to BitSwan Anomaly. Or when “number of unsuccessfully logged in users” exceeds 10 during the last 1 and “server reboot” happens during that time, a complex symptom / correlation with type “user_server_reboot” happens, with server as key dimension (see below).
As mentioned above, the calculation of metrics and detection of symptoms happen in in-memory analyzers. The analyzers contain matrices organized by dimensions and time on different axes (X - dimensions, Y - time). When the analyze function happens, the matrix is searched through, metrics are calculated and symptoms produced. When considering the example above, the metric of “unsuccessfully logged in users” is calculated for all users (one dimension) on a given server and “server reboot” specifically for the given server (another dimension).
Symptom
Symptoms can be either simple (f. e. user login was not successful) or complex (f. e. logins for 80 % users were not successful during last 20 minutes). Such symptoms should be produced by analytical microservices before they enter the AnomalyAnalyzer.
Every symptom should contain the following fields:
{
"@timestamp": UNIX_TIMESTAMP,
"type": STRING,
"key_dimension1": ...
"key_dimension2": ...
...
"other_dimension": ... (OPTIONAL)
...
"status": future/open/closed (OPTIONAL)
}
Type of the symptom is then assigned to anomalies with the same time.
For instance, user_login_failed
symptom is assigned to the user_login_failed
anomaly.
Key dimensions then identify both the symptom and anomaly specifically.
For instance, key dimension for user_login_failed
can be either a specific user
,
or a specific ID of an application
.
Other dimensions serve for further identification, but they do not influence assignment to a specific anomaly.
What is the key dimension and what is not, should be defined in the configuration (see “Configuration” section below).
Anomaly
Anomalies are composed of multiple symptoms with the same type and key dimensions. They are opened when the first such symptom come and closed, when one of the following strategies is configured and satisfied:
- a symptom with attribute
status
equalsclosed
comes (default) - a certain amount of time passed since the last symptom with
status
equalsopen
came
The strategies are defined for every anomaly type in the configuration (see “Configuration” section below) or the custom anomaly object based on the abstract Anomaly class.
The structure of the anomaly looks as follows:
{
"@timestamp": UNIX_TIMESTAMP,
"ts_end": UNIX_TIMESTAMP,
"type": STRING,
"status": STRING,
"symptoms": ARRAY,
"key_dimension1": ...
"key_dimension2": ...
}
@timestamp
is the timestamp of the first symptom.
Both open and closed anomalies are persistently stored in ElasticSearch.
When an anomaly is closed, its _id
field in the ElasticSearch contains
the backup
keyword.
Analyzers
Stream analyzer
Stream analyzer is a type of analyzer which is integral to pipeline and produces its own events in dependence on a data stream.
Time Drift Analyzer
Time Drift Analyzer is a specific type of analyzer focused on time difference analysis as regards event occurrence in stream in relation to current time, i.e. time of passage through stream analyzer. This analyzer describes the behaviour of the system when data stream in real time is delayed, which can serve as system fault identification or only as system behaviour reference which has to be taken into consideration during the following event processing.
For more general information about analyzers please read the section Software Engineering.
Time Window Analyzer
Time Window Analyzer is a specific type of analyzer operating on TimeWindowMatrix
, which evaluates incoming events in the context of a time window and dimensions observed and performs mathematical operation over them which is determined by the input function.
It captures events, arranges them according to time dimension (e.g. current time or time of event) as well as according to some other determined dimension, e.g. IP address. Over events in a defined time window it performs various mathematical operations with their fields - like addition of values, searching minimum and maximum values, addition of number of values, standard deviations and the like.
Over the value matrix prepared in the above mentioned way it subsequently performs other mathematical operations which constitute entry into the actual analysis - e.g. searching for dimensions different from the other ones (outliers), grouping dimensions according to values (classification), searching for time intervals with empty values (drop to zero) or specific values (triggers) and the like.
The output of Time Window Analyzer is a new event as the result of a mathematical operation applied to values captured in the time window given.
BitSwan also works with events which have only just come, nevertheless are already beyond the observed time window (late events) or events which have not occurred yet (early events). These undesirable states are evaluated and the user is provided with feedback for the following analysis.
Parameters of the constructor app
, pipeline
, matrix_id=None
, dtype='float_'
, columns=15
, analyze_on_clock=False
, resolution=60
,
start_time=None
, clock_driven=True
, id=None
, config=None
. If matrix_id
is present, the pre-created matrix will be ‘located’.
analyze_on_clock
enables the call of analyze()
function by timer. There should be added 'analyze_period':time_in_seconds
to config
.
Default period is 1 minute.
Session Analyzer
Session Analyzer is a specific type of analyzer operating on SessionMatrix
, which evaluates incoming events in context of a specific value (e.g. Session ID) and performs pre-defined operations over them. As regards time, the validity of the session can be limited.
Parameters of the constructor app
, pipeline
, matrix_id=None
, dtype='float_'
, analyze_on_clock=False
, id=None
, config=None
.
dtype
is passed to Matrix
if needed. If matrix_id
is present, the pre-created matrix will be ‘located’.
analyze_on_clock
enables the call of analyze()
function by timer. There should be added 'analyze_period':time_in_seconds
to config
.
Default period is 1 minute.
Geo Analyzer
Geo Analyzer is an analyzer, which operates on map projection with given GPS bounding box (GeoMatrix
is an implementation of this concept).
Parameters in constructor app
, pipeline
, matrix_id=None
, dtype='float_'
, analyze_on_clock=False
, bbox=None
, resolution=5
, id=None
,
config=None
(see GeoMatrix
and SessioAnalyzer
).
AnomalyAnalyzer
AnomalyAnalyzer analyzes symptoms of anomalies to assign them to anomalies based on key dimensions within the symptom, which are defined in the “key_dimensions” configuration option for every symptom type.
Matrix
For more general information about matrices please read the section Software Engineering.
Session Matrix
Session Matrix is a specific matrix, which can aggregate in columns the information of different type. Works with Session Analyzer.
Initialization
Parameters in the constructor: app
, dtype='float_'
, id=None
, config=None
. I guess you’re familiar with all of them except for dtype
.
SessionMatrix
has columns with different names and types. For instance, you want to keep in the matrix 3 columns: timestamp
as integer,
tag
as string and some_numbers
as float submatrix with 2 rows and 2 columns. That brings dtype
, which has to be a list in the form dtype=[('timestamp', 'i8'), ('tag', 'U20'), ('some_numbers','(2,2)f8')]
. The first tuple member is a name, the second is a type. i8
means 8 bit integer, U20
means unicode string length at most 20 characters and (2,2)f8
means 8 bit float matrix with size (2, 2).
Attributes
Array
is the numpy
matrix. Consists of rows and columns and cells.
N2IMap
structure helping to translate row name (unique id) to numeric array index/
I2NMap
index-to-row-name translation.
ClosedRows
is a set, containing temporaly unused row indeces.
Functions
row_index = SessionMatrix.add_row(row_name)
adds new row with specific id and returns numerical index of the row in the matrix.
row_index = SessionMatrix.get_row_index(row_name)
translates row_id
to the index in matrix, None
if does not exist.
row_name = SessionMatrix.get_row_name(row_index)
translates row_name
to the name or id, None
if does not exist.
SessionMatrix.close_row(row_index)
adds index to ClosedRows
.
SessionMatrix.flush()
deletes closed rows from the matrix.
Time Window Matrix
Time Window Matrix is a specific matrix, which can aggregate events with time dimension in columns. Works with Time Window Analyzer.
Initialization
Parameters in the constructor (with defaults) app
, dtype='float_'
, resolution=60
, columns=15
, clock_driven=True
, start_time=None
, id=None
, config=None
.
TimeWindowMatrix
has a time dimension columns, where events with timestamp are aggregated. You may notice, that dtype
is not a list unlike SessionMatrix
,fist number represent number of columns (first member of tuple) and number of cells inside each column (second member). resolution
means number of seconds in each time column of the Matrix. clock_driven=True
means, that time window once in a while will be moving forward by adding
newest column and deleting the oldest one. The period of advancing is resolution
. start_time
is a timestamp of starting, if it’s None
, the start time will be current time.
Attributes
(same as SessionMatrix
)
additional ones:
Start
newest timestamp is seconds.
End
oldest timestamp is seconds.
Resolution
seconds in each column.
Dimensions
(number of columns, cells in column)
WarmingUpCount
array indicating if added row is ‘old’ enough to be analyzed.
Functions
(same as SessionMatrix
)
plus:
column_index = TimeWindowMatrix.get_column(timestamp)
returns column index of the cell the timestamp (in seconds) belongs to.
None
, if it’s outside.
TimeWindowMatrix.advance(target_timestamp)
possibly move forward the time window
Geo Matrix
Geo Matrix is a specific matrix, which can aggregate events on a map within the bounding box. Works with Geo Analyzer.
Initialization
Parameters in the constructor (with defaults) app
, dtype='float_'
, bbox=None
, resolution=5
, id=None
, config=None
.
GeoMatrix
is fixed size matrix, representing the map. Real GPS coordinates are projected to map coordinates using equirectangular
transformation. dtype
see SessionMatrix
section. bbox
is a dictionary with map extreme points
max_lat
, min_lat
, min_lon
, max_lon
. If None
, it takes european coordinates.
resolution
is the size of the cell in kilometers.
Attributes
Array
is the numpy
matrix. Consists of rows and columns and cells.
BBox
is a bounding box (see initialization section).
Resolution
is the size of the cell in kilometers.
MembersToIds
SizeWidth
width of map in km
SizeHeight
height of map in km
MapWidth
number of rows in matrix (alias of Array.shape[0]
)
MapHeight
number of columns in matrix (alias of Array.shape[1]
)
Functions
is_in_boundaries(lat, lon)
tests if coordinates fall into the map, returns boolean.
row, column = project_equirectangular(lat, lon)
translates real coordinates to map coordinates, using equirectangular transformation.
lat, lon = inverse_equirectangular(row, column)
translates map coordinates into gps coordinates.
Anomaly management
AnomalyAnalyzer
AnomalyAnalyzer analyzes symptoms of anomalies to assign them to anomalies based on key dimensions within the symptom, which are defined in the “key_dimensions” configuration option for every symptom type.
AnomalyManager
AnomalyManager stores symptoms of anomalies to create an anomaly object based on key dimensions within the symptom.
Dimensions are stored as part of individual symptoms inside the anomaly object. Other dimensions than key dimensions are optional and serve only to differentiate the symptoms from one another.
When the symptom contains status set to closed and the anomaly is specified to finish by that flag, the anomaly “ts_end” is copied from this symptom and the anomaly object moved from “open” to “closed” inside the anomaly storage (both in-memory and ElasticSearch, see above).
AnomalyObject
AnomalyObject is created by the AnomalyManager according to the type of the anomaly. Unless a specific AnomalyObject is created for the type, GeneralAnomaly is going to be used.
AnomalyStorage
AnomalyStorage serves to store anomaly objects, separated to “open” (anomalies that are not closed by status attribute in a symptom) and “closed”.
The closed anomalies are periodically flushed from the storage to an external system.