Welcome to BSPump reference documentation!¶
-
class
bspump.
BSPumpApplication
(args=None, web_listen=None)[source]¶ Application object for BSPump.
-
class
bspump.
Pipeline
(app, id=None, config=None)[source]¶ Multiple sources
A pipeline can have multiple sources. They are simply passed as an list of sources to a pipeline build() method.
class MyPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( [ MySource1(app, self), MySource2(app, self), MySource3(app, self), ] bspump.common.NullSink(app, self), )
-
ConfigDefaults
= {'async_concurency_limit': 1000}¶
-
catch_error
(exception, event)[source]¶ Override to evaluate on the pipeline processing error. Return True for hard errors (stop the pipeline processing) or False for soft errors that will be ignored
class SampleInternalPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.InternalSource(app, self), bspump.common.JSONParserProcessor(app, self), bspump.common.PPrintSink(app, self) ) def catch_error(self, exception, event): if isinstance(exception, json.decoder.JSONDecodeError): return False return True
-
ensure_future
(coro)[source]¶ You can use this method to schedule a future task that will be executed in a context of the pipeline. The pipeline also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block the pipeline via set_error().
If the number of futures exceeds the configured limit, the pipeline is throttled.
- Parameters
coro –
- Returns
-
async
inject
(context, event, depth)[source]¶ Inject method serves to inject events into the pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.
For normal operations, it is highly recommended to use process method instead (see below).
- Parameters
context –
event –
depth –
- Returns
-
insert_after
(id, processor)[source]¶ Insert the processor into a pipeline after another processor specified by id
- Returns
True on success. False otherwise (id not found)
-
insert_before
(id, processor)[source]¶ Insert the processor into a pipeline before another processor specified by id
- Returns
True on success. False otherwise (id not found)
-
-
class
bspump.
PumpBuilder
(definition)[source]¶ PumpBuilder is meant to create the pipeline with connections, processors, sources alternatively.
definition
is a path to the json file, containing description of the pump. Example of such file:{ "pipelines" : [ { "id": "MyPipeline0", "args": {}, "config": {}, "sources": [ { "id": "FileCSVSource", "module": "bspump.file", "class" : "FileCSVSource", "args": {}, "config": {"path":"etc/test.csv", "post":"noop"}, "trigger": { "module": "bspump.trigger", "class": "OpportunisticTrigger", "id": "", "args": {} } } ], "processors": [ { "module":"bspump-pumpbuilder", "class": "Processor00", "args": {}, "config": {} } ], "sink": { "module":"bspump.common", "class": "PPrintSink", "args": {}, "config": {} } } ] }
-
class
bspump.
Source
(app, pipeline, id=None, config=None)[source]¶ Each source represent a coroutine/Future/Task that is running in the context of the main loop. The coroutine method main() contains an implementation of each particular source.
Source MUST await a pipeline ready state prior producing the event. It is acomplished by await self.Pipeline.ready() call.
-
class
bspump.
TriggerSource
(app, pipeline, id=None, config=None)[source]¶ This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc. You need to provide a trigger class and implement cycle() method.
Trigger source will stop execution, when a pipeline is cancelled (raises concurrent.futures.CancelledError). This typically happens when a program wants to quit in reaction to a on the signal.
You also may overload the main() method to provide additional parameters for a cycle() method.
async def main(self): async with aiohttp.ClientSession(loop=self.Loop) as session: await super().main(session) async def cycle(self, session): session.get(...)
-
class
bspump.
Generator
(app, pipeline, id=None, config=None)[source]¶ Generator object is used to generate one or multiple events in asynchronous way and pass them to following processors in the pipeline. In the case of Generator, user overrides generate method, not process.
1.) Generator can iterate through an event to create (generate) derived ones and pass them to following processors.
Example of a custom Generator class with generate method:
class MyGenerator(bspump.Generator): async def generate(self, context, event, depth): for item in event.items: await self.Pipeline.inject(context, item, depth) 2.) Generator can in the same way also generate completely independent events, if necessary. In this way, the generator processes originally synchronous events "out-of-band" e.g. out of the synchronous processing within the pipeline. Specific implementation of the generator should implement the generate method to process events while performing long running (asynchronous) tasks such as HTTP requests or SQL select. The long running tasks may enrich events with relevant information, such as output of external calculations. Example of generate method:
async def generate(self, context, event, depth): # Perform possibly long-running asynchronous operation async with aiohttp.ClientSession() as session: async with session.get("https://example.com/resolve_color/{}".format(event.get("color_id", "unknown"))) as resp: if resp.status != 200: return new_event = await resp.json() # Inject a new event into a next depth of the pipeline await self.Pipeline.inject(context, new_event, depth)
-
class
bspump.
Matrix
(app, dtype='float_', id=None, config=None)[source]¶ Generic Matrix object.
Matrix structure is organized in a following hiearchical order:
Matrix -> Rows -> Columns -> Cells
Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.
The description of types that can be used for a dtype of a cell:
Name
Definition
‘b’
Byte
‘i’
Signed integer
‘u’
Unsigned integer
‘f’
Floating point
‘c’
Complex floating point
‘S’
String
‘U’
Unicode string
‘V’
Raw data
Example: ‘i8’ stands for int64.
For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html
Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.
-
async
analyze
()[source]¶ The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy fuctions. Examples: 1. You have a vector with n rows. You need only those row indeces, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Usefull numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().
-
async
-
class
bspump.
Lookup
(app, id=None, config=None, lazy=False)[source]¶ Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.
If the “lazy” parameter in the constructor is set to True, no load method is called and the user is expected to call it when necessary.
-
ConfigDefaults
= {'master_lookup_id': '', 'master_timeout': 30, 'master_url': ''}¶
-
abstract async
load
() → bool[source]¶ Return True is lookup has been changed.
Example:
async def load(self): self.set(bspump.load_json_file('./examples/data/country_names.json')) return True
-
-
class
bspump.kafka.
KafkaConnection
(app, id=None, config=None)[source]¶ KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.
config = {"compression_type": "gzip"} app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.kafka.KafkaConnection(app, "KafkaConnection", config) )
ConfigDefaults
options:compression_type
: Kafka supports several compression types:gzip
,snappy
andlz4
. This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.
-
ConfigDefaults
= {'bootstrap_servers': 'localhost:9092', 'compression_type': ''}¶
-
class
bspump.kafka.
KafkaSource
(app, pipeline, connection, id=None, config=None)[source]¶ KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance.
-
ConfigDefaults
= {'api_version': 'auto', 'auto_offset_reset': 'earliest', 'client_id': 'BSPump-KafkaSource', 'consumer_timeout_ms': '', 'event_block_size': 1000, 'event_idle_time': 0.01, 'get_timeout_ms': 20000, 'group_id': '', 'max_partition_fetch_bytes': '', 'request_timeout_ms': '', 'retry': 20, 'session_timeout_ms': 10000, 'topic': ''}¶
-
-
class
bspump.kafka.
KafkaSink
(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]¶ KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.
KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) There are two ways to use KafkaSink: - Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline. - Specify topic separetly for each event in event context - context['kafka_topic']. Topic from configuration is than used as a default topic. To provide business logic for event distribution, you can create topic selector processor. Processor example:
class KafkaTopicSelector(bspump.Processor): def process(self, context, event): if event.get("weight") > 10: context["kafka_topic"] = "heavy" else: context["kafka_topic"] = "light" return event Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key']. If kafka_key is not provided, key defaults to None.
-
ConfigDefaults
= {'acks': '', 'api_version': '', 'client_id': '', 'connections_max_idle_ms': '', 'enable_idempotency': '', 'encoding': 'utf-8', 'key_serializer': '', 'linger_ms': '', 'max_batch_size': '', 'max_request_size': '', 'metadata_max_age_ms': '', 'output_queue_max_size': 100, 'request_timeout_ms': '', 'retry_backoff_ms': '', 'send_backoff_ms': '', 'topic': '', 'transaction_timeout_ms': '', 'transactional_id': '', 'value_serializer': ''}¶
-
-
class
bspump.kafka.
KafkaKeyFilter
(app, pipeline, keys, id=None, config=None)[source]¶ KafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.
Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.
Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.
KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.
-
class
bspump.influxdb.
InfluxDBSink
(app, pipeline, connection, id=None, config=None)[source]¶ InfluxDBSink is a sink processor, that stores the event into an InfluxDB database specified in the InfluxDBConnection object.
class SamplePipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.socket.TCPStreamSource(app, self, config={'port': 7000}), bspump.influxdb.InfluxDBSink(app, self, "InfluxConnection1") )
-
class
bspump.influxdb.
InfluxDBConnection
(app, id=None, config=None)[source]¶ InfluxDBConnection serves to connect BSPump application with an InfluxDB database. The InfluxDB server is accessed via URL, and the database is specified using the db parameter in the configuration.
app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.influxdb.InfluxDBConnection(app, "InfluxConnection1") )
-
ConfigDefaults
= {'db': 'mydb', 'output_bucket_max_size': 1000000, 'output_queue_max_size': 10, 'timeout': 30, 'url': 'http://localhost:8086/'}¶
-
-
class
bspump.elasticsearch.
ElasticSearchConnection
(app, id=None, config=None)[source]¶ ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance
usage:
# adding connection to PumpService svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection") )
# pass connection name ("ESConnection" in our example) to relevant BSPump's object: self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection"), bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection") )
-
ConfigDefaults
= {'allowed_bulk_response_codes': '201', 'bulk_out_max_size': 1048576, 'loader_per_url': 4, 'output_queue_max_size': 10, 'password': '', 'timeout': 300, 'url': 'http://localhost:9200/', 'username': ''}¶
-
-
class
bspump.elasticsearch.
ElasticSearchSink
(app, pipeline, connection, id=None, config=None)[source]¶ ElasticSearchSink allows you to insert events into ElasticSearch through POST requests
-
ConfigDefaults
= {'doctype': 'doc', 'index_prefix': 'bspump_', 'max_index_size': 32212254720, 'rollover_mechanism': 'time', 'time_period': 'd', 'timeout': 30}¶
-
-
class
bspump.elasticsearch.
ElasticSearchSource
(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]¶ ElasticSearchSource is using standard Elastic’s search API to fetch data.
configs
index - Elastic’s index (default is ‘index-
*
’).scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
specific pamameters
paging - boolean (default is True)
request_body - dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
default_request_body = { 'query': { 'bool': { 'must': { 'match_all': {} } } }, }
-
ConfigDefaults
= {'index': 'index-*', 'scroll_timeout': '1m'}¶
-
-
class
bspump.elasticsearch.
ElasticSearchAggsSource
(app, pipeline, connection, request_body=None, id=None, config=None)[source]¶ ElasticSearchAggsSource is used for Elastic’s search aggregations.
configs
index: - Elastic’s index (default is ‘index-
*
’).specific pamameters
request_body dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
default_request_body = { 'query': { 'bool': { 'must': { 'match_all': {} } } }, }
-
ConfigDefaults
= {'index': 'index-*'}¶
-
-
class
bspump.elasticsearch.
ElasticSearchLookup
(app, connection, id=None, config=None, cache=None)[source]¶ The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of datbase hits.
configs
index - Elastic’s index
key - field name to match
scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
Example:
class ProjectLookup(bspump.elasticsearch.ElasticSearchLookup): async def count(self, database): return await database['projects'].count_documents({}) def find_one(self, database, key): return database['projects'].find_one({'_id':key})
-
ConfigDefaults
= {'index': '', 'key': '', 'scroll_timeout': '1m'}¶
-