Welcome to BSPump reference documentation!

class bspump.BSPumpApplication(args=None, web_listen=None)[source]

Application object for BSPump.

create_argument_parser()[source]

This method can be overriden to adjust argparse configuration. Refer to the Python standard library to argparse.ArgumentParser for details of arguments.

async main()[source]
parse_arguments(args=None)[source]
class bspump.Pipeline(app, id=None, config=None)[source]

Multiple sources

A pipeline can have multiple sources. They are simply passed as an list of sources to a pipeline build() method.

class MyPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            [
                MySource1(app, self),
                MySource2(app, self),
                MySource3(app, self),
            ]
            bspump.common.NullSink(app, self),
        )
ConfigDefaults = {'async_concurency_limit': 1000}
append_processor(processor)[source]
build(source, *processors)[source]
catch_error(exception, event)[source]

Override to evaluate on the pipeline processing error. Return True for hard errors (stop the pipeline processing) or False for soft errors that will be ignored

class SampleInternalPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)

        self.build(
            bspump.common.InternalSource(app, self),
            bspump.common.JSONParserProcessor(app, self),
            bspump.common.PPrintSink(app, self)
        )

    def catch_error(self, exception, event):
        if isinstance(exception, json.decoder.JSONDecodeError):
            return False
        return True
ensure_future(coro)[source]

You can use this method to schedule a future task that will be executed in a context of the pipeline. The pipeline also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block the pipeline via set_error().

If the number of futures exceeds the configured limit, the pipeline is throttled.

Parameters

coro

Returns

async inject(context, event, depth)[source]

Inject method serves to inject events into the pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.

For normal operations, it is highly recommended to use process method instead (see below).

Parameters
  • context

  • event

  • depth

Returns

insert_after(id, processor)[source]

Insert the processor into a pipeline after another processor specified by id

Returns

True on success. False otherwise (id not found)

insert_before(id, processor)[source]

Insert the processor into a pipeline before another processor specified by id

Returns

True on success. False otherwise (id not found)

is_error()[source]
is_ready()[source]
iter_processors()[source]

Iterate thru all processors.

locate_connection(app, connection_id)[source]
locate_processor(processor_id)[source]

Find by a processor by id.

locate_source(address)[source]

Find a source by id.

async process(event, context=None)[source]

Process method serves to inject events into the pipeline’s depth 0, while incrementing the event.in metric.

This is recommended way of inserting events into a pipeline.

Parameters
  • event

  • context

Returns

async ready()[source]

Can be used in source: await self.Pipeline.ready()

rest_get()[source]
set_error(context, event, exc)[source]

If called with exc is None, then reset error (aka recovery)

set_source(source)[source]
start()[source]
async stop()[source]
throttle(who, enable=True)[source]
time()[source]
class bspump.PumpBuilder(definition)[source]

PumpBuilder is meant to create the pipeline with connections, processors, sources alternatively. definition is a path to the json file, containing description of the pump. Example of such file:

{
        "pipelines" : [
                {
                        "id": "MyPipeline0",
                        "args": {},
                        "config": {},
                        "sources": [
                                {
                                        "id": "FileCSVSource",
                                        "module": "bspump.file",
                                        "class" : "FileCSVSource",
                                        "args": {},
                                        "config": {"path":"etc/test.csv", "post":"noop"},
                                        "trigger": {
                                                "module": "bspump.trigger",
                                                "class": "OpportunisticTrigger",
                                                "id": "",
                                                "args": {}
                                        }
                                }
                        ],
                        "processors": [
                                {
                                        "module":"bspump-pumpbuilder",
                                        "class": "Processor00",
                                        "args": {},
                                        "config": {}
                                }
                        ],
                        "sink": {
                                "module":"bspump.common",
                                "class": "PPrintSink",
                                "args": {},
                                "config": {}
                        }
                }
        ]
} 
construct_connection(app, svc, connection)[source]
construct_connections(app, svc)[source]
construct_lookup(app, svc, lookup)[source]
construct_lookups(app, svc)[source]
construct_pipeline(app, svc, pipeline_definition)[source]
construct_pipelines(app, svc)[source]
construct_processor(app, svc, pipeline, definition)[source]
construct_processors(app, svc, pipeline, definition)[source]
construct_pump(app, svc)[source]

The main method to construct the pump. app is a BspumpApplication object, svc` is service. Example of use:

app = BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pump_builder = PumpBuilder(definition)
pump_builder.construct_pump(app, svc)
app.run()
construct_source(app, svc, pipeline, definition)[source]
construct_sources(app, svc, pipeline, definition)[source]
construct_trigger(app, svc, definition)[source]
class bspump.Source(app, pipeline, id=None, config=None)[source]

Each source represent a coroutine/Future/Task that is running in the context of the main loop. The coroutine method main() contains an implementation of each particular source.

Source MUST await a pipeline ready state prior producing the event. It is acomplished by await self.Pipeline.ready() call.

classmethod construct(app, pipeline, definition: dict)[source]
locate_address()[source]
abstract async main()[source]
async process(event, context=None)[source]

This method is used to emit event into a pipeline.

If there is an error in the processing of the event, the pipeline is throttled by setting the error and the exception raised. The source should catch this exception and fail gracefully.

rest_get()[source]
restart(loop)[source]
start(loop)[source]
async stop()[source]
async stopped()[source]

Helper that simplyfies the implementation of sources:

async def main(self):
    ... initialize resources here

    await self.stopped()

    ... finalize resources here
class bspump.TriggerSource(app, pipeline, id=None, config=None)[source]

This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc. You need to provide a trigger class and implement cycle() method.

Trigger source will stop execution, when a pipeline is cancelled (raises concurrent.futures.CancelledError). This typically happens when a program wants to quit in reaction to a on the signal.

You also may overload the main() method to provide additional parameters for a cycle() method.

async def main(self):
    async with aiohttp.ClientSession(loop=self.Loop) as session:
        await super().main(session)


async def cycle(self, session):
    session.get(...)
abstract async cycle(*args, **kwags)[source]
async main(*args, **kwags)[source]
on(trigger)[source]

Add trigger

rest_get()[source]
time()[source]
class bspump.Sink(app, pipeline, id=None, config=None)[source]
class bspump.Processor(app, pipeline, id=None, config=None)[source]
class bspump.Generator(app, pipeline, id=None, config=None)[source]

Generator object is used to generate one or multiple events in asynchronous way and pass them to following processors in the pipeline. In the case of Generator, user overrides generate method, not process.

1.) Generator can iterate through an event to create (generate) derived ones and pass them to following processors.

Example of a custom Generator class with generate method:

    class MyGenerator(bspump.Generator):

        async def generate(self, context, event, depth):
            for item in event.items:
                await self.Pipeline.inject(context, item, depth)

2.) Generator can in the same way also generate completely independent events, if necessary.
In this way, the generator processes originally synchronous events "out-of-band" e.g. out of the synchronous processing within the pipeline.

Specific implementation of the generator should implement the generate method to process events while performing
long running (asynchronous) tasks such as HTTP requests or SQL select.
The long running tasks may enrich events with relevant information, such as output of external calculations.

Example of generate method:
async def generate(self, context, event, depth):

    # Perform possibly long-running asynchronous operation
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com/resolve_color/{}".format(event.get("color_id", "unknown"))) as resp:
            if resp.status != 200:
                return
            new_event = await resp.json()

    # Inject a new event into a next depth of the pipeline
    await self.Pipeline.inject(context, new_event, depth)
abstract async generate(context, event, depth)[source]
process(context, event)[source]
set_depth(depth)[source]
class bspump.Connection(app, id=None, config=None)[source]
classmethod construct(app, definition: dict)[source]
time()[source]
class bspump.Matrix(app, dtype='float_', id=None, config=None)[source]

Generic Matrix object.

Matrix structure is organized in a following hiearchical order:

Matrix -> Rows -> Columns -> Cells

Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.

The description of types that can be used for a dtype of a cell:

Name

Definition

‘b’

Byte

‘i’

Signed integer

‘u’

Unsigned integer

‘f’

Floating point

‘c’

Complex floating point

‘S’

String

‘U’

Unicode string

‘V’

Raw data

Example: ‘i8’ stands for int64.

For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html

Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.

add_row()[source]
async analyze()[source]

The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy fuctions. Examples: 1. You have a vector with n rows. You need only those row indeces, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Usefull numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().

build_shape(rows=0)[source]

Override this method to have a control over the shape of the matrix.

close_row(row_index, clear=True)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

time()[source]
zeros(rows=0)[source]
class bspump.Lookup(app, id=None, config=None, lazy=False)[source]

Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.

If the “lazy” parameter in the constructor is set to True, no load method is called and the user is expected to call it when necessary.

ConfigDefaults = {'master_lookup_id': '', 'master_timeout': 30, 'master_url': ''}
deserialize(data)[source]
ensure_future_update(loop)[source]
is_master()[source]
abstract async load() → bool[source]

Return True is lookup has been changed.

Example:

async def load(self):
    self.set(bspump.load_json_file('./examples/data/country_names.json'))
    return True
load_from_cache()[source]

Load the lookup data from a cache. Data (bytes) are read from a file and passed to deserialize function.

async load_from_master()[source]
rest_get()[source]
save_to_cache(data)[source]
serialize()[source]
time()[source]
class bspump.MappingLookup(app, id=None, config=None, lazy=False)[source]
class bspump.DictionaryLookup(app, id=None, config=None, lazy=False)[source]
deserialize(data)[source]
rest_get()[source]
serialize()[source]
set(dictionary: dict)[source]
bspump.load_json_file(fname)[source]
class bspump.kafka.KafkaConnection(app, id=None, config=None)[source]

KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.

config = {"compression_type": "gzip"}
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.kafka.KafkaConnection(app, "KafkaConnection", config)
)

ConfigDefaults options:

  • compression_type: Kafka supports several compression types: gzip, snappy and lz4. This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.

ConfigDefaults = {'bootstrap_servers': 'localhost:9092', 'compression_type': ''}
create_consumer(*topics, **kwargs)[source]
async create_producer(**kwargs)[source]
get_bootstrap_servers()[source]
get_compression()[source]

Returns compression type to use in connection

class bspump.kafka.KafkaSource(app, pipeline, connection, id=None, config=None)[source]

KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.

class KafkaPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)
                self.build(
                        bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                        bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
                )

To ensure that after restart, pump will continue receiving messages where it left of, group_id has to
be provided in the configuration.

When the group_id is set, the consumer group is created and the Kafka server will then operate
in the producer-consumer mode. It means that every consumer with the same group_id will be assigned
unique set of partitions, hence all messages will be divided among them and thus unique.

Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous
way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder).
Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer
from the partition, thus causing rebalance.
ConfigDefaults = {'api_version': 'auto', 'auto_offset_reset': 'earliest', 'client_id': 'BSPump-KafkaSource', 'consumer_timeout_ms': '', 'event_block_size': 1000, 'event_idle_time': 0.01, 'get_timeout_ms': 20000, 'group_id': '', 'max_partition_fetch_bytes': '', 'request_timeout_ms': '', 'retry': 20, 'session_timeout_ms': 10000, 'topic': ''}
create_consumer()[source]
async initialize_consumer()[source]
async main()[source]
class bspump.kafka.KafkaSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]

KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.

KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.

class KafkaPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)
                self.build(
                        bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                        bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
        )

There are two ways to use KafkaSink:
 - Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline.
 - Specify topic separetly for each event in event context - context['kafka_topic'].
        Topic from configuration is than used as a default topic.
        To provide business logic for event distribution, you can create topic selector processor.
Processor example:
class KafkaTopicSelector(bspump.Processor):

        def process(self, context, event):
                if event.get("weight") > 10:
                        context["kafka_topic"] = "heavy"
                else:
                        context["kafka_topic"] = "light"

                return event

Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key'].
If kafka_key is not provided, key defaults to None.
ConfigDefaults = {'acks': '', 'api_version': '', 'client_id': '', 'connections_max_idle_ms': '', 'enable_idempotency': '', 'encoding': 'utf-8', 'key_serializer': '', 'linger_ms': '', 'max_batch_size': '', 'max_request_size': '', 'metadata_max_age_ms': '', 'output_queue_max_size': 100, 'request_timeout_ms': '', 'retry_backoff_ms': '', 'send_backoff_ms': '', 'topic': '', 'transaction_timeout_ms': '', 'transactional_id': '', 'value_serializer': ''}
process(context, event: Union[dict, str, bytes])[source]
class bspump.kafka.KafkaKeyFilter(app, pipeline, keys, id=None, config=None)[source]

KafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.

Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.

Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.

KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.

process(context, event)[source]
class bspump.influxdb.InfluxDBSink(app, pipeline, connection, id=None, config=None)[source]

InfluxDBSink is a sink processor, that stores the event into an InfluxDB database specified in the InfluxDBConnection object.

class SamplePipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.socket.TCPStreamSource(app, self, config={'port': 7000}),
            bspump.influxdb.InfluxDBSink(app, self, "InfluxConnection1")
        )
process(context, event)[source]
class bspump.influxdb.InfluxDBConnection(app, id=None, config=None)[source]

InfluxDBConnection serves to connect BSPump application with an InfluxDB database. The InfluxDB server is accessed via URL, and the database is specified using the db parameter in the configuration.

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
    bspump.influxdb.InfluxDBConnection(app, "InfluxConnection1")
)
ConfigDefaults = {'db': 'mydb', 'output_bucket_max_size': 1000000, 'output_queue_max_size': 10, 'timeout': 30, 'url': 'http://localhost:8086/'}
consume(data)[source]

Consumes user-defined data to be stored in the InfluxDB database.

flush(event_name=None)[source]

Directly flushes the content of the internal bucket with data to InfluxDB database.

class bspump.elasticsearch.ElasticSearchConnection(app, id=None, config=None)[source]

ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance

usage:

# adding connection to PumpService
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
# pass connection name ("ESConnection" in our example) to relevant BSPump's object:

self.build(
                bspump.kafka.KafkaSource(app, self, "KafkaConnection"),
                bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
ConfigDefaults = {'allowed_bulk_response_codes': '201', 'bulk_out_max_size': 1048576, 'loader_per_url': 4, 'output_queue_max_size': 10, 'password': '', 'timeout': 300, 'url': 'http://localhost:9200/', 'username': ''}
consume(data)[source]
flush()[source]
get_session()[source]
get_url()[source]
class bspump.elasticsearch.ElasticSearchSink(app, pipeline, connection, id=None, config=None)[source]

ElasticSearchSink allows you to insert events into ElasticSearch through POST requests

ConfigDefaults = {'doctype': 'doc', 'index_prefix': 'bspump_', 'max_index_size': 32212254720, 'rollover_mechanism': 'time', 'time_period': 'd', 'timeout': 30}
process(context, event)[source]
class bspump.elasticsearch.ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]

ElasticSearchSource is using standard Elastic’s search API to fetch data.

configs

index - Elastic’s index (default is ‘index-*’).

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

specific pamameters

paging - boolean (default is True)

request_body - dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

Default is:

default_request_body = {
        'query': {
                'bool': {
                        'must': {
                                'match_all': {}
                        }
                }
        },
}
ConfigDefaults = {'index': 'index-*', 'scroll_timeout': '1m'}
async cycle()[source]
class bspump.elasticsearch.ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]

ElasticSearchAggsSource is used for Elastic’s search aggregations.

configs

index: - Elastic’s index (default is ‘index-*’).

specific pamameters

request_body dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

Default is:

default_request_body = {
        'query': {
                'bool': {
                        'must': {
                                'match_all': {}
                        }
                }
        },
}
ConfigDefaults = {'index': 'index-*'}
async cycle()[source]
async process_aggs(path, aggs_name, aggs)[source]
async process_buckets(path, parent, buckets)[source]

Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process

class bspump.elasticsearch.ElasticSearchLookup(app, connection, id=None, config=None, cache=None)[source]

The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of datbase hits.

configs

index - Elastic’s index

key - field name to match

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

Example:

class ProjectLookup(bspump.elasticsearch.ElasticSearchLookup):

        async def count(self, database):
                return await database['projects'].count_documents({})

        def find_one(self, database, key):
                return database['projects'].find_one({'_id':key})
ConfigDefaults = {'index': '', 'key': '', 'scroll_timeout': '1m'}
build_find_one_query(key) → dict[source]

Override this method to build your own lookup query :return: Default single-key query

classmethod construct(app, definition: dict)[source]
async load()[source]

Return True is lookup has been changed.

Example:

async def load(self):
    self.set(bspump.load_json_file('./examples/data/country_names.json'))
    return True

Indices and tables