Building Blocks

Pull

The pull is the primary building block when it comes down to fetching / retrieving data from various sources.

Some examples:

  • Start up a FTP server and wait for new uploaded files
  • Listen to created, updated or deleted files in the file system
  • Listen to a mqtt broker for new messages on a specific topic
  • Frequently poll the fitbit api to fetch the current step count
  • Frequently poll the host for statistics (like cpu / ram usage)

As you can see pulls can either

  • react on one or multiple events (like files created, updated, deleted)
  • or frequently poll a source

to provide data to its downstream: the push

# Counts from 0 to infinity. Emits a new counter every second.

tasks:
  - name: pull  # Task name -> Glues together a pull and 1..n pushes
    pull:
      plugin: pnp.plugins.pull.simple.Count  # Fully qualified path to the pull
      args:
        interval: 1s  # Every second
    push:
      - plugin: pnp.plugins.push.simple.Echo

Push

A push simply processes the downstream data from a pull.

Some examples:

  • Publish a message on a specific topic on an mqtt broker
  • Send a templated notification via slack
  • Execute a script using derived arguments
  • Send a http request to a server
  • Simply echo the data to the console (nice for debugging)
# The first push Will publish the counter to the 'home/counter/state' topic
# on a locally running mqtt broker.
# The second push will print the counter to the console.

tasks:
  - name: push
    pull:
      plugin: pnp.plugins.pull.simple.Count
    push:
      - plugin: pnp.plugins.push.mqtt.Publish  # Fully qualified path of the push
        args:  # Arguments
          host: localhost
          topic: home/counter/state
          port: 1883
          retain: true
      # You can specify more than one push.
      # Multiple pushes will be executed in parallel.
      - plugin: pnp.plugins.push.simple.Echo

Selector

Sometimes the output of a pull needs to be transformed before the downstream push can handle it. Selectors to the rescue: A selector transforms the output of a pull by using pure python code.

# Selector will transform output to 'x is Even' resp. 'x id Odd'
# when the counter is even resp. odd.

tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # The selector is written in pure python
        selector: "'{} is {}'.format(payload, 'Even' if payload % 2 == 0 else 'Odd')"

Easy as that. We can reference our incoming data via data or payload.

Dependencies

By default all defined pushes will execute in parallel when new incoming data is available. But now it would be awesome if we could chain pushes together. So that the output of one push becomes the input of the next push. The good thing is: Yes we can.

# Executes the command `echo hello world` at the shell level
# and passes the exit code, stdout and stderr to the next
# downstream push.

tasks:
  - name: dependencies
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      plugin: pnp.plugins.push.simple.Execute
      args:
        command: "echo hello world"
        capture: true  # Capture exit code, stdout and stderr
      deps:
        - plugin: pnp.plugins.push.simple.Echo

Envelope

New in version 0.7.0.

By using an envelope it is possible to change the behavior of pushes during runtime. One of the best examples is pnp.plugins.push.mqtt.Publish plugins where you can override the actual topic where the message should be published.

Easier to understand by a practical example:

# Uses an advanced selector to override push arguments during
# runtime

tasks:
  - name: envelope
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.mqtt.Publish
        # This is an advanced selector. See section Advanced Selector for details
        selector:
          # This is the actual counter
          payload: "lambda payload: payload"
          # This configures the topic based on the counter payload
          # It can be either counter/even or counter/odd
          topic: "lambda payload: 'counter/{}'.format('even' if payload % 2 == 0 else 'odd')"
        args:
          host: localhost
          topic: home/counter/state
          port: 1883
          retain: true

How does this work

If the emitted or transformed payload (via selector) contains the key data or payload the pipeline assumes that this is the actual payload and all other keys represent the so called envelope.

Note

It is easy to construct envelopes by using the Advanced Selector Expression

UDFs

New in version 0.14.0.

When transforming the output of a pull with the help of a selector you sometimes need to call python functions to perform certain operations. A selector does only provide very few basic functions like type conversions (str, float, …) out of the box.

But the good thing is: You can register simple python builtin functions, special user defined functions and your own python functions to make use of them.

Besides providing function logic UDFs can carry a state and some special behavior like out of the box throttling.

udfs:
  # Register the builtin str callable as my_str
  - name: my_str
    plugin: str
  # Register a custom callable
  - name: my_on_off
    plugin: pnp.utils.on_off
  # Instantiate a Counter user defined function
  - name: count
    plugin: pnp.plugins.udf.simple.Counter
    # The presence of args tells pnp to instantiate a Counter
    # -> important because it has a state (the actual count)
    args:
      init: 1
  - name: count0
    plugin: pnp.plugins.udf.simple.Counter
    # Even without an argument (init is 0 by default), the args is important:
    # This tells pnp to instantiate a Counter. Otherwise it is assumed to be a callable
    args:
  - name: count_err
    # Without args. Each time you call count_err this will actually call the __init__ of the Counter class
    plugin: pnp.plugins.udf.simple.Counter
tasks:
  - name: hello-world
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          on_off: "lambda d: my_on_off(True)"
          str: "lambda d: my_str(d)"
          cnt0: "lambda d: count0()"
          cnt1: "lambda d: count()"
          # This will actually create a counter class instead of counting
          cnterr: "lambda d: count_err(name='error')"
          data: "lambda d: d"

See also

UDF throttle