Advanced topics

Advanced selector expressions

New in version 0.12.0.

Instead of string-only selector expressions, you may now use complex dictionary and/or list constructs in your yaml to define a selector expression. If you use a dictionary or a list make sure to provide “real” selectors as a lambda expression, so the evaluator can decide if this is a string literal or an expression to evaluate.

The configuration below will repeat

{'hello': 'Hello', 'words': ['World', 'Moon', 'Mars']}
tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        wait: 1
        repeat: "Hello World Moon Mars"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          hello: "lambda payload: payload.split(' ')[0]"
          words:
            - "lambda payload: payload.split(' ')[1]"
            - "lambda payload: payload.split(' ')[2]"
            - "lambda payload: payload.split(' ')[3]"

Prior to the implementation of the advanced selector feature your expressions would have probably looked similar to this:

dict(hello=payload.split(' ')[0], words=[payload.split(' ')[1], payload.split(' ')[2], payload.split(' ')[3]])

The first one is more readable, isn’t it?

A more complex example showcasing literals and lambda expressions in more depth:

tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Returns: 'World'
        # no complex structure. Evaluator assumes that this is an expression -> you do not need a lambda
        selector: "str(payload.split(' ')[0])"
      - plugin: pnp.plugins.push.simple.Echo
        # Returns {'header': 'this is a header', 'data': 'World', 'Hello': 'World'}
        selector:
          # Just string literals
          header: this is a header
          # Value is lambda and therefore evaluated
          data: "lambda data: data.split(' ')[1]"
          # Both are lambdas and therefore evaluated
          "lambda data: str(data.split(' ')[0])": "lambda data: data.split(' ')[1]"
      - plugin: pnp.plugins.push.simple.Echo
        # Returns ['foo', 'bar', 'Hello', 'World']
        selector:
          - foo  # String literal
          - bar  # String literal
          # Lambda -> evaluate the expression
          - "lambda d: d.split(' ')[0]"
          # Lambda -> evaluate the expression
          - "lambda d: d.split(' ')[1]"

API

New in version 0.24.0.

You can explicitly activate an rest-api to perform various stuff:

Determine if everything is up and running:

curl -X GET "http://localhost:9999/health"

Retrieve the current python and pnp version:

curl -X GET "http://localhost:9999/version"

Change the log level at runtime. Useful if you want to track down an issue. Example:

curl -X POST "http://localhost:9999/loglevel?level=DEBUG"

Retrieve prometheus compliant metrics:

curl -X GET "http://localhost:9999/metrics"

See the openapi specification in your browser: http://localhost:9999/docs

Trigger tasks manually (even when the task has some schedule assigned):

curl -X POST "http://localhost:9999/trigger?task=<task_name>"

Note

You need to explicitly enable the /metrics endpoint while configuring your api. Please see the example below for reference.

api:
  port: 9999  # API listens on port 9999; mandatory
  endpoints:  # Optional
    # Enable metrics endpoint: http://localhost:9999/metrics, default is false.
    metrics: true
tasks:
  - name: task
    pull:
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        instant_run: true
        interval: "*/1 * * * *"
    push:
      - plugin: pnp.plugins.push.simple.Echo

To create a task with no schedule assigned you can pass nothing to the interval argument of your task. This way the task will not be scheduled automatically. The only way to run it is to do it explicitly via the api.

api:
  port: 9999
  endpoints:
    metrics: false
tasks:
  - name: my_task
    pull:
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        # No schedule -> Execution: `curl -X POST http://localhost:9999/trigger?task=my_task`
        interval:
    push:
      - plugin: pnp.plugins.push.simple.Echo

Augment configuration

Deprecated since version 0.28.0: Use YAML Tags. dictmentor for configuration augmentation will be removed in a future release.

You can augment the configuration by extensions from the dictmentor package. Please see dictmentor on Github for further reference.

The dictmentor instance will be instantiated with the following code and thus the following extensions:

from dictmentor import DictMentor, ext
return DictMentor(
  ext.Environment(fail_on_unset=True),
  ext.ExternalResource(base_path=os.path.dirname(config_path)),
  ext.ExternalYamlResource(base_path=os.path.dirname(config_path))
)

The following configuration will demonstrate the configuration augmentation in more depth:

# Uses the dictmentor package to augment the configuration
# by dictmentor extensions.
# Make sure to export the environment variable to echo:
# export MESSAGE="Hello World"

tasks:
  - name: dictmentor
    pull:
      external: repeat.pull
    push:
      - external: echo.push
      - external: nop.push
# Contents of repeat.pull
plugin: pnp.plugins.pull.simple.Repeat
args:
  wait: 1
  repeat: "{{env::MESSAGE}}"
# Contents of echo.push
plugin: pnp.plugins.push.simple.Echo
# Contents of nop.push
plugin: pnp.plugins.push.simple.Nop

Engines

In version 0.22.0 I’ve decided to remove all engines except for the Async engine. Due to some tests this engine is amazingly stable, has great performance and you do not need to think about synchronizing parallel tasks so much.

This decision was basically driven from a maintenance view of perspective. In the future I like to add some infrastructural code like an api to communicate with the engine. And I will not be able to integrate necessary changes to all of the engines.

By default the AsyncEngine is used implicitly:

tasks:  # Implicit use of the AsyncEngine
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

You can add it explicitly though. This might be useful in the future IF I decide to add variants of the AsyncEngine or if you want to use a RetryHandler.

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.SimpleRetryHandler
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

Logging

New in version 0.11.0.

You can use different logging configurations in two ways: Either by specify the logging configuration when starting pnp via the console or by setting the environment variable PNP_LOG_CONF and then run pnp.

# Specify explicitly when starting pnp
pnp --log=<logging_configuration> <pnp_configuration>

# Specify by environment variable
export PNP_LOG_CONF=<logging_configuration>
pnp <pnp_configuration>

If you do not specify a logging configuration explicitly pnp will search for a logging.yaml 1. in the current working directory 2. in the directory where your pnp configuration file is located

You can disable this automatic and implicit logging configuration probing by starting pnp by passing --no-log-probe:

pnp --no-log-probe <pnp_configuration>

A simple logging configuration that will log severe errors to a separate rotating log file looks like this:

version: 1
disable_existing_loggers: False

formatters:
    simple:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

handlers:
    console:
        class: logging.StreamHandler
        level: DEBUG
        formatter: simple
        stream: ext://sys.stdout

    error_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: ERROR
        formatter: simple
        filename: errors.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

root:
    level: INFO
    handlers: [console, error_file_handler]

Payload unwrapping

Sometimes a pull returns a list of items or you created a list of readings by using the selector. As long as a push can process a list as an input everything is fine. But now imagine that you want to process each item of that list individually. Unwrapping to the rescue. Unwrapping will exactly allow you what was asked for above: Pass each item of a list to a push individually.

tasks:
  - name: unwrapping
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat:
          - 1
          - 2
          - 3
    push:
      - plugin: pnp.plugins.push.simple.Echo
        unwrap: true  # Magic!

But sometimes you want to add items to the list before processing it item by item. You can do this by using a push.simple.Nop properly.

tasks:
  - name: unwrapping
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat:
          - 1
          - 2
          - 3
    push:
      - plugin: pnp.plugins.push.simple.Nop
        # You can add items to a list ...
        selector: "data + [4, 5, 6]"
        # ... and not unwrap (which is the default) ...
        unwrap: false
        # ... and use unwrap: true in a dependant push
        deps:
          - plugin: pnp.plugins.push.simple.Echo
            unwrap: true

Retry handler

By using the explicit form of the engine specification you can specify a so called RetryHandler as well. A RetryHandler controls what happens if your push unexpectedly exits or raises an error.

NoRetryHandler

Will not retry ro run the pull if it fails. If all pulls exit for some reason pnp will exit as well.

engine: !engine
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.NoRetryHandler
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

SimpleRetryHandler

In addition to the NoRetryHandler this handler will retry after a specified amount of time configured by passing retry_wait (default is 60 seconds).

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.SimpleRetryHandler
    retry_wait: 60s  # Default is 60 seconds as well
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

LimitedRetryHandler

In addition to retrying the pull you can configure how many retries are allowed by passing max_retries (default is 3). Each failed pull execution will raise the counter and the counter will not reset.

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.LimitedRetryHandler
    retry_wait: 60s  # Default is 60 seconds as well
    # Retry to run the pull 3 times before giving up
    max_retries: 3
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

AdvancedRetryHandler

In addition to the LimitedRetryHandler the counter will be reset if the pull has ran successful for a specific amount of time by passing reset_retry_threshold (default is 60 seconds).

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.AdvancedRetryHandler
    retry_wait: 60s  # Default is 60 seconds as well
    # Retry to run the pull 3 times before giving up
    max_retries: 3
    # Will reset the retry count after 60 seconds
    # of successful execution of the pull
    reset_retry_threshold: 60s
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

Note

If no RetryHandler is explicitly specified the AdvancedRetryHandler will be used. The instance created will use the default values for it’s arguments.

Suppress push

Sometimes you want to suppress (do not execute) a push based on the actual incoming payload from a pull. To achieve this all you have to do is to emit the special sentinel SUPPRESS from your selector expression.

tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Only print the counter if it is even. Suppress odd ones.
        selector: "payload if payload % 2 == 0 else SUPPRESS"

UDF Throttle

Consider the following situation: You have a selector that uses a udf to fetch a state from an external system.

The state won’t change so often, but your selector will fetch the state every time a pull transports a payload to the push. You want to decrease the load on the external system and you want to increase throughput.

Throttling to the rescue. Specifying throttle when instantiating your udf will manage that your udf will only call the external system once and cache the result. Subsequent calls will either return the cached result or call the external system again when a specified time has passed since the last call that actually fetched a result from the external system.

udfs:
  - name: count  # Instantiate a Counter user defined function
    plugin: pnp.plugins.udf.simple.Counter
    args:  # The presence of args tells pnp to instantiate a Counter - important because it has a state (the actual count)
      init: 1
      # Will only call the counter if 10 seconds passed between current call and last call.
      # In the meantime a cached result will be returned.
      throttle: 10s
tasks:
  - name: hello-world
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          counter: "lambda d: count()"

YAML Tags

You can use yaml tags to augment your yaml configuration. Right now the following tags are supported:

  • !include: To incorporate another yaml file into your configuration
  • !env: To inject a environment variable. Use := to denote a default value. The configuration loading process will fail if the environment variable is not set and no default is given.

The following example will demonstrate the yaml tags in more depth:

# Use !include to incorporate another yaml file into this one

tasks:
  - name: tags
    pull: !include _repeat.yaml
    push:
      - !include _echo.yaml
      - !include _nop.yaml
# Contents of _repeat.yaml
plugin: pnp.plugins.pull.simple.Repeat
args:
  wait: 1
  repeat: !env MESSAGE:=Hello World
# Contents of _echo.yaml
plugin: pnp.plugins.push.simple.Echo
# Contents of _nop.yaml
plugin: pnp.plugins.push.simple.Nop