Welcome to BSPump reference documentation!

class bspump.BSPumpApplication(web_listen=None)[source]

Application object for BSPump.

create_argument_parser()[source]

This method can be overriden to adjust argparse configuration. Refer to the Python standard library to argparse.ArgumentParser for details of arguments.

main()[source]
parse_arguments()[source]
class bspump.Pipeline(app, id=None)[source]

Multiple sources

A pipeline can have multiple sources. They are simply passed as an list of sources to a pipeline build() method.

class MyPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            [
                MySource1(app, self),
                MySource2(app, self),
                MySource3(app, self),
            ]
            bspump.common.NullSink(app, self),
        )
append_processor(processor)[source]
build(source, *processors)[source]
catch_error(exception, event)[source]

Override to evaluate on the pipeline processing error. Return True for hard errors (stop the pipeline processing) or False for soft errors that will be ignored

class SampleInternalPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)

        self.build(
            bspump.common.InternalSource(app, self),
            bspump.common.JSONParserProcessor(app, self),
            bspump.common.PPrintSink(app, self)
        )

    def catch_error(self, exception, event):
        if isinstance(exception, json.decoder.JSONDecodeError):
            return False
        return True
is_error()[source]
is_ready()[source]
iter_processors()[source]

Iterate thru all processors.

locate_connection(app, connection_id)[source]
locate_processor(processor_id)[source]

Find by a processor by id.

locate_source(address)[source]

Find a source by id.

process(event, context=None)[source]
ready()[source]

Can be used in source: await self.Pipeline.ready()

rest_get()[source]
set_error(context, event, exc)[source]

If called with exc is None, then reset error (aka recovery)

set_source(source)[source]
start()[source]
stop()[source]
throttle(who, enable=True)[source]
class bspump.PumpBuilder(definition)[source]

PumpBuilder is meant to create the pipeline with connections, processors, sources alternatively. definition is a path to the json file, containing description of the pump. Example of such file:

{
        "pipelines" : [
                {
                        "id": "MyPipeline0",
                        "args": {},
                        "config": {},
                        "sources": [
                                {
                                        "id": "FileCSVSource",
                                        "module": "bspump.file",
                                        "class" : "FileCSVSource",
                                        "args": {},
                                        "config": {"path":"etc/test.csv", "post":"noop"},
                                        "trigger": {
                                                "module": "bspump.trigger",
                                                "class": "OpportunisticTrigger",
                                                "id": "",
                                                "args": {}
                                        }
                                }
                        ],
                        "processors": [
                                {
                                        "module":"bspump-pumpbuilder",
                                        "class": "Processor00",
                                        "args": {},
                                        "config": {}
                                }
                        ],
                        "sink": {
                                "module":"bspump.common",
                                "class": "PPrintSink",
                                "args": {},
                                "config": {}
                        }
                }
        ]
} 
construct_connection(app, svc, connection)[source]
construct_connections(app, svc)[source]
construct_lookup(app, svc, lookup)[source]
construct_lookups(app, svc)[source]
construct_pipeline(app, svc, pipeline_definition)[source]
construct_pipelines(app, svc)[source]
construct_processor(app, svc, pipeline, definition)[source]
construct_processors(app, svc, pipeline, definition)[source]
construct_pump(app, svc)[source]

The main method to construct the pump. app is a BspumpApplication object, svc` is service. Example of use:

app = BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pump_builder = PumpBuilder(definition)
pump_builder.construct_pump(app, svc)
app.run()
construct_source(app, svc, pipeline, definition)[source]
construct_sources(app, svc, pipeline, definition)[source]
construct_trigger(app, svc, definition)[source]
class bspump.Source(app, pipeline, id=None, config=None)[source]

Each source represent a coroutine/Future/Task that is running in the context of the main loop. The coroutine method main() contains an implementation of each particular source.

Source MUST await a pipeline ready state prior producing the event. It is acomplished by await self.Pipeline.ready() call.

classmethod construct(app, pipeline, definition: dict)[source]
locate_address()[source]
main()[source]
process(event, context=None)[source]

This method is used to emit event into a pipeline.

If there is an error in the processing of the event, the pipeline is throttled by setting the error and the exception raised. The source should catch this exception and fail gracefully.

rest_get()[source]
restart(loop)[source]
start(loop)[source]
stop()[source]
stopped()[source]

Helper that simplyfies the implementation of sources:

async def main(self):
    ... initialize resources here

    await self.stopped()

    ... finalize resources here
class bspump.TriggerSource(app, pipeline, id=None, config=None)[source]

This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc. You need to provide a trigger class and implement cycle() method.

Trigger source will stop execution, when a pipeline is cancelled (raises concurrent.futures.CancelledError). This typically happens when a program wants to quit in reaction to a on the signal.

You also may overload the main() method to provide additional parameters for a cycle() method.

async def main(self):
    async with aiohttp.ClientSession(loop=self.Loop) as session:
        await super().main(session)


async def cycle(self, session):
    session.get(...)
cycle(*args, **kwags)[source]
main(*args, **kwags)[source]
on(trigger)[source]

Add trigger

rest_get()[source]
class bspump.Sink(app, pipeline, id=None, config=None)[source]
class bspump.Processor(app, pipeline, id=None, config=None)[source]
class bspump.Generator(app, pipeline, id=None, config=None)[source]

Example of use:

class GeneratingProcessor(bspump.Generator):

    def process(self, context, event):

        def generate(items):
            for item in items:
                yield item

        return generate(event.items)
class bspump.Connection(app, connection_id, config=None)[source]
classmethod construct(app, definition: dict)[source]
exception bspump.ProcessingError[source]

Generic exception that indicate an error in a pipeline processing.

class bspump.Lookup(app, lookup_id, config=None)[source]
ConfigDefaults = {'master_lookup_id': '', 'master_timeout': 30, 'master_url': ''}
deserialize(data)[source]
ensure_future_update(loop)[source]
is_master()[source]
load() → bool[source]

Return True is lookup has been changed.

Example:

async def load(self):
    self.set(bspump.load_json_file('./examples/data/country_names.json'))
    return True
load_from_cache()[source]

Load the lookup data from a cache. Data (bytes) are read from a file and passed to deserialize function.

load_from_master()[source]
save_to_cache(data)[source]
serialize()[source]
class bspump.MappingLookup(app, lookup_id, config=None)[source]
class bspump.DictionaryLookup(app, lookup_id, config=None)[source]
deserialize(data)[source]
serialize()[source]
set(dictionary: dict)[source]
bspump.load_json_file(fname)[source]
class bspump.kafka.KafkaConnection(app, connection_id, config=None)[source]

KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
    bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
ConfigDefaults = {'bootstrap_servers': 'localhost:9092', 'disabled': 0, 'output_queue_max_size': 100}
consume(topic, message)[source]

Consumes a user-defined message by storing it in a queue and later publishing to Apache Kafka.

get_bootstrap_servers()[source]
class bspump.kafka.KafkaSource(app, pipeline, connection, id=None, config=None)[source]

KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.

class KafkaPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
            bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
        )
ConfigDefaults = {'api_version': 'auto', 'auto_offset_reset': 'latest', 'client_id': 'BSPump-KafkaSource', 'group_id': '', 'max_partition_fetch_bytes': 1048576, 'topic': ''}
main()[source]
process_message(message)[source]
class bspump.kafka.KafkaSink(app, pipeline, connection, id=None, config=None)[source]

KafkaSink is a sink processor that expects the event to be a user-defined message (such as string) and publishes it to a defined Apache Kafka instance configured in a KafkaConnection object.

class KafkaPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
            bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
    )
ConfigDefaults = {'topic': ''}
process(context, event)[source]
class bspump.influxdb.InfluxDBSink(app, pipeline, connection, id=None, config=None)[source]

InfluxDBSink is a sink processor, that stores the event into an InfluxDB database specified in the InfluxDBConnection object.

class SamplePipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.socket.TCPStreamSource(app, self, config={'port': 7000}),
            bspump.influxdb.InfluxDBSink(app, self, "InfluxConnection1")
        )
process(context, event)[source]
class bspump.influxdb.InfluxDBConnection(app, connection_id, config=None)[source]

InfluxDBConnection serves to connect BSPump application with an InfluxDB database. The InfluxDB server is accessed via URL, and the database is specified using the db parameter in the configuration.

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
    bspump.influxdb.InfluxDBConnection(app, "InfluxConnection1")
)
ConfigDefaults = {'db': 'mydb', 'output_bucket_max_size': 1000000, 'output_queue_max_size': 10, 'timeout': 30, 'url': 'http://localhost:8086/'}
consume(data)[source]

Consumes user-defined data to be stored in the InfluxDB database.

flush(event_name=None)[source]

Directly flushes the content of the internal bucket with data to InfluxDB database.

class bspump.elasticsearch.ElasticSearchConnection(app, connection_id, config=None)[source]

ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance

usage:

# adding connection to PumpService
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
# pass connection name ("ESConnection" in our example) to relevant BSPump's object:

self.build(
                bspump.kafka.KafkaSource(app, self, "KafkaConnection"),
                bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
ConfigDefaults = {'allowed_bulk_response_codes': '201', 'bulk_out_max_size': 1048576, 'loader_per_url': 4, 'output_queue_max_size': 10, 'password': '', 'timeout': 300, 'url': 'http://localhost:9200/', 'username': ''}
consume(data)[source]
flush()[source]
get_session()[source]
get_url()[source]
class bspump.elasticsearch.ElasticSearchSink(app, pipeline, connection, id=None, config=None)[source]

ElasticSearchSink allows you to insert events into ElasticSearch through POST requests

ConfigDefaults = {'doctype': 'doc', 'index_prefix': 'bspump_', 'max_index_size': 32212254720, 'rollover_mechanism': 'time', 'time_period': 'd', 'timeout': 30}
process(context, event)[source]
class bspump.elasticsearch.ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]

ElasticSearchSource is using standard Elastic’s search API to fetch data.

configs

index - Elastic’s index (default is ‘index-*’).

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

specific pamameters

paging - boolean (default is True)

request_body - dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

Default is:

default_request_body = {
        'query': {
                'bool': {
                        'must': {
                                'match_all': {}
                        }
                }
        },
}
ConfigDefaults = {'index': 'index-*', 'scroll_timeout': '1m'}
cycle()[source]
class bspump.elasticsearch.ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]

ElasticSearchAggsSource is used for Elastic’s search aggregations.

configs

index: - Elastic’s index (default is ‘index-*’).

specific pamameters

request_body dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

Default is:

default_request_body = {
        'query': {
                'bool': {
                        'must': {
                                'match_all': {}
                        }
                }
        },
}
ConfigDefaults = {'index': 'index-*'}
cycle()[source]
process_aggs(path, aggs_name, aggs)[source]
process_buckets(path, parent, buckets)[source]

Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process

class bspump.elasticsearch.ElasticSearchLookup(app, lookup_id, es_connection, config=None)[source]

The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of datbase hits.

configs

index - Elastic’s index

key - field name to match

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

Example:

class ProjectLookup(bspump.elasticsearch.ElasticSearchLookup):

        async def count(self, database):
                return await database['projects'].count_documents({})

        def find_one(self, database, key):
                return database['projects'].find_one({'_id':key})
ConfigDefaults = {'index': '', 'key': '', 'scroll_timeout': '1m'}
classmethod construct(app, definition: dict)[source]
load()[source]

Return True is lookup has been changed.

Example:

async def load(self):
    self.set(bspump.load_json_file('./examples/data/country_names.json'))
    return True

Indices and tables