Pull ‘n’ Push

Installation

Installation is done by pip. Simple and straightforward:

pip install pnp

Some plugins might require some extras to work properly. Installation of extras is simple as well.

Example: The plugin pnp.plugins.pull.fitbit.Current requires the extra fitbit. To install the extra:

pip install pnp[fitbit]

If you need to install multiple extras:

pip install pnp[extra1,extra2,extra3]

Please consult the plugin documentation to see if a component requires an extra or not.

Important

We cannot ensure not to introduce any breaking changes to interfaces / behaviour. This might occur every commit whether it is intended or by accident. Nevertheless we try to list breaking changes in the changelog that we are aware of.

You are encouraged to specify the version explicitly in your dependency tools, e.g.:

pip install pnp==0.23.0

Warning

It seems that the package uvloop has some issues on debian / raspbian stretch. pnp can run without uvloop but sanic installs it as a dependency anyways (but it is optional in our case). Unfortunately we can’t simply not install uvloop in the first place because of pnp’s package locking mechanism powered by poetry. As far as I can tell there is no way to instruct poetry to inject the necessary environment variable to tell sanic not to install uvloop. See the sanic documentation for more details.

In short: If you experience segmentation faults after running pnp you should consider to remove uvloop from your site-packages.:

pip uninstall uvloop

Quickstart

The major concepts of Pull ‘n’ Push are pulls and pushes (surprise!).

Pulls define how to fetch data from sources. A pull passes it’s fetched data to one or more push components.

Note

Example: The pull pnp.plugins.pull.monitor.Stats periodically fetches statistics from the host it is running on (like cpu and ram usage). It passes it’s statistics on to the pnp.plugins.push.mqtt.Publish to publish those statistics on a mqtt broker.

Gluing together pulls and pushes is done by a task. It’s similar to defining an input / output pipeline.

Now let’s get serious. Copy and paste the following configuration to the file helloworld.yaml.

tasks:
  - name: hello-world
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

And run it by invoking the console runner:

pnp helloworld.yaml

This example yields the string 'Hello World' every second.

Console Runner

If you install pnp via pip it will automatically install a script you can invoke:

> pnp --help
Usage: pnp [OPTIONS] CONFIGFILE

  Pull 'n' Push. Runs or checks the given CONFIGFILE

Options:
  -c, --check                     Only check the given config file but does
                                  not run it.

  --no-log-probe                  Disables the automatic logging configuration
                                  probing.

  --log FILE                      Specify logging configuration to load.
  --log-level [DEBUG|INFO|WARNING|ERROR]
                                  Overrides the log level.
  --version                       Show the version and exit.
  --help                          Show this message and exit.

Alternatively you can call the module via python interpreter:

python3 -m pnp --help

Building Blocks

Pull

The pull is the primary building block when it comes down to fetching / retrieving data from various sources.

Some examples:

  • Start up a FTP server and wait for new uploaded files
  • Listen to created, updated or deleted files in the file system
  • Listen to a mqtt broker for new messages on a specific topic
  • Frequently poll the fitbit api to fetch the current step count
  • Frequently poll the host for statistics (like cpu / ram usage)

As you can see pulls can either

  • react on one or multiple events (like files created, updated, deleted)
  • or frequently poll a source

to provide data to its downstream: the push

# Counts from 0 to infinity. Emits a new counter every second.

tasks:
  - name: pull  # Task name -> Glues together a pull and 1..n pushes
    pull:
      plugin: pnp.plugins.pull.simple.Count  # Fully qualified path to the pull
      args:
        interval: 1s  # Every second
    push:
      - plugin: pnp.plugins.push.simple.Echo

Push

A push simply processes the downstream data from a pull.

Some examples:

  • Publish a message on a specific topic on an mqtt broker
  • Send a templated notification via slack
  • Execute a script using derived arguments
  • Send a http request to a server
  • Simply echo the data to the console (nice for debugging)
# The first push Will publish the counter to the 'home/counter/state' topic
# on a locally running mqtt broker.
# The second push will print the counter to the console.

tasks:
  - name: push
    pull:
      plugin: pnp.plugins.pull.simple.Count
    push:
      - plugin: pnp.plugins.push.mqtt.Publish  # Fully qualified path of the push
        args:  # Arguments
          host: localhost
          topic: home/counter/state
          port: 1883
          retain: true
      # You can specify more than one push.
      # Multiple pushes will be executed in parallel.
      - plugin: pnp.plugins.push.simple.Echo

Selector

Sometimes the output of a pull needs to be transformed before the downstream push can handle it. Selectors to the rescue: A selector transforms the output of a pull by using pure python code.

# Selector will transform output to 'x is Even' resp. 'x id Odd'
# when the counter is even resp. odd.

tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # The selector is written in pure python
        selector: "'{} is {}'.format(payload, 'Even' if payload % 2 == 0 else 'Odd')"

Easy as that. We can reference our incoming data via data or payload.

Dependencies

By default all defined pushes will execute in parallel when new incoming data is available. But now it would be awesome if we could chain pushes together. So that the output of one push becomes the input of the next push. The good thing is: Yes we can.

# Executes the command `echo hello world` at the shell level
# and passes the exit code, stdout and stderr to the next
# downstream push.

tasks:
  - name: dependencies
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      plugin: pnp.plugins.push.simple.Execute
      args:
        command: "echo hello world"
        capture: true  # Capture exit code, stdout and stderr
      deps:
        - plugin: pnp.plugins.push.simple.Echo

Envelope

New in version 0.7.0.

By using an envelope it is possible to change the behavior of pushes during runtime. One of the best examples is pnp.plugins.push.mqtt.Publish plugins where you can override the actual topic where the message should be published.

Easier to understand by a practical example:

# Uses an advanced selector to override push arguments during
# runtime

tasks:
  - name: envelope
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.mqtt.Publish
        # This is an advanced selector. See section Advanced Selector for details
        selector:
          # This is the actual counter
          payload: "lambda payload: payload"
          # This configures the topic based on the counter payload
          # It can be either counter/even or counter/odd
          topic: "lambda payload: 'counter/{}'.format('even' if payload % 2 == 0 else 'odd')"
        args:
          host: localhost
          topic: home/counter/state
          port: 1883
          retain: true

How does this work

If the emitted or transformed payload (via selector) contains the key data or payload the pipeline assumes that this is the actual payload and all other keys represent the so called envelope.

Note

It is easy to construct envelopes by using the Advanced Selector Expression

UDFs

New in version 0.14.0.

When transforming the output of a pull with the help of a selector you sometimes need to call python functions to perform certain operations. A selector does only provide very few basic functions like type conversions (str, float, …) out of the box.

But the good thing is: You can register simple python builtin functions, special user defined functions and your own python functions to make use of them.

Besides providing function logic UDFs can carry a state and some special behavior like out of the box throttling.

udfs:
  # Register the builtin str callable as my_str
  - name: my_str
    plugin: str
  # Register a custom callable
  - name: my_on_off
    plugin: pnp.utils.on_off
  # Instantiate a Counter user defined function
  - name: count
    plugin: pnp.plugins.udf.simple.Counter
    # The presence of args tells pnp to instantiate a Counter
    # -> important because it has a state (the actual count)
    args:
      init: 1
  - name: count0
    plugin: pnp.plugins.udf.simple.Counter
    # Even without an argument (init is 0 by default), the args is important:
    # This tells pnp to instantiate a Counter. Otherwise it is assumed to be a callable
    args:
  - name: count_err
    # Without args. Each time you call count_err this will actually call the __init__ of the Counter class
    plugin: pnp.plugins.udf.simple.Counter
tasks:
  - name: hello-world
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          on_off: "lambda d: my_on_off(True)"
          str: "lambda d: my_str(d)"
          cnt0: "lambda d: count0()"
          cnt1: "lambda d: count()"
          # This will actually create a counter class instead of counting
          cnterr: "lambda d: count_err(name='error')"
          data: "lambda d: d"

See also

UDF throttle

Advanced topics

Advanced selector expressions

New in version 0.12.0.

Instead of string-only selector expressions, you may now use complex dictionary and/or list constructs in your yaml to define a selector expression. If you use a dictionary or a list make sure to provide “real” selectors as a lambda expression, so the evaluator can decide if this is a string literal or an expression to evaluate.

The configuration below will repeat

{'hello': 'Hello', 'words': ['World', 'Moon', 'Mars']}
tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        wait: 1
        repeat: "Hello World Moon Mars"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          hello: "lambda payload: payload.split(' ')[0]"
          words:
            - "lambda payload: payload.split(' ')[1]"
            - "lambda payload: payload.split(' ')[2]"
            - "lambda payload: payload.split(' ')[3]"

Prior to the implementation of the advanced selector feature your expressions would have probably looked similar to this:

dict(hello=payload.split(' ')[0], words=[payload.split(' ')[1], payload.split(' ')[2], payload.split(' ')[3]])

The first one is more readable, isn’t it?

A more complex example showcasing literals and lambda expressions in more depth:

tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Returns: 'World'
        # no complex structure. Evaluator assumes that this is an expression -> you do not need a lambda
        selector: "str(payload.split(' ')[0])"
      - plugin: pnp.plugins.push.simple.Echo
        # Returns {'header': 'this is a header', 'data': 'World', 'Hello': 'World'}
        selector:
          # Just string literals
          header: this is a header
          # Value is lambda and therefore evaluated
          data: "lambda data: data.split(' ')[1]"
          # Both are lambdas and therefore evaluated
          "lambda data: str(data.split(' ')[0])": "lambda data: data.split(' ')[1]"
      - plugin: pnp.plugins.push.simple.Echo
        # Returns ['foo', 'bar', 'Hello', 'World']
        selector:
          - foo  # String literal
          - bar  # String literal
          # Lambda -> evaluate the expression
          - "lambda d: d.split(' ')[0]"
          # Lambda -> evaluate the expression
          - "lambda d: d.split(' ')[1]"

API

New in version 0.24.0.

You can explicitly activate an rest-api to perform various stuff:

Determine if everything is up and running:

curl -X GET "http://localhost:9999/health"

Retrieve the current python and pnp version:

curl -X GET "http://localhost:9999/version"

Change the log level at runtime. Useful if you want to track down an issue. Example:

curl -X POST "http://localhost:9999/loglevel?level=DEBUG"

Retrieve prometheus compliant metrics:

curl -X GET "http://localhost:9999/metrics"

See the openapi specification in your browser: http://localhost:9999/docs

Trigger tasks manually (even when the task has some schedule assigned):

curl -X POST "http://localhost:9999/trigger?task=<task_name>"

Note

You need to explicitly enable the /metrics endpoint while configuring your api. Please see the example below for reference.

api:
  port: 9999  # API listens on port 9999; mandatory
  endpoints:  # Optional
    # Enable metrics endpoint: http://localhost:9999/metrics, default is false.
    metrics: true
tasks:
  - name: task
    pull:
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        instant_run: true
        interval: "*/1 * * * *"
    push:
      - plugin: pnp.plugins.push.simple.Echo

To create a task with no schedule assigned you can pass nothing to the interval argument of your task. This way the task will not be scheduled automatically. The only way to run it is to do it explicitly via the api.

api:
  port: 9999
  endpoints:
    metrics: false
tasks:
  - name: my_task
    pull:
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        # No schedule -> Execution: `curl -X POST http://localhost:9999/trigger?task=my_task`
        interval:
    push:
      - plugin: pnp.plugins.push.simple.Echo

Augment configuration

Deprecated since version 0.28.0: Use YAML Tags. dictmentor for configuration augmentation will be removed in a future release.

You can augment the configuration by extensions from the dictmentor package. Please see dictmentor on Github for further reference.

The dictmentor instance will be instantiated with the following code and thus the following extensions:

from dictmentor import DictMentor, ext
return DictMentor(
  ext.Environment(fail_on_unset=True),
  ext.ExternalResource(base_path=os.path.dirname(config_path)),
  ext.ExternalYamlResource(base_path=os.path.dirname(config_path))
)

The following configuration will demonstrate the configuration augmentation in more depth:

# Uses the dictmentor package to augment the configuration
# by dictmentor extensions.
# Make sure to export the environment variable to echo:
# export MESSAGE="Hello World"

tasks:
  - name: dictmentor
    pull:
      external: repeat.pull
    push:
      - external: echo.push
      - external: nop.push
# Contents of repeat.pull
plugin: pnp.plugins.pull.simple.Repeat
args:
  wait: 1
  repeat: "{{env::MESSAGE}}"
# Contents of echo.push
plugin: pnp.plugins.push.simple.Echo
# Contents of nop.push
plugin: pnp.plugins.push.simple.Nop

Engines

In version 0.22.0 I’ve decided to remove all engines except for the Async engine. Due to some tests this engine is amazingly stable, has great performance and you do not need to think about synchronizing parallel tasks so much.

This decision was basically driven from a maintenance view of perspective. In the future I like to add some infrastructural code like an api to communicate with the engine. And I will not be able to integrate necessary changes to all of the engines.

By default the AsyncEngine is used implicitly:

tasks:  # Implicit use of the AsyncEngine
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

You can add it explicitly though. This might be useful in the future IF I decide to add variants of the AsyncEngine or if you want to use a RetryHandler.

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.SimpleRetryHandler
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

Logging

New in version 0.11.0.

You can use different logging configurations in two ways: Either by specify the logging configuration when starting pnp via the console or by setting the environment variable PNP_LOG_CONF and then run pnp.

# Specify explicitly when starting pnp
pnp --log=<logging_configuration> <pnp_configuration>

# Specify by environment variable
export PNP_LOG_CONF=<logging_configuration>
pnp <pnp_configuration>

If you do not specify a logging configuration explicitly pnp will search for a logging.yaml 1. in the current working directory 2. in the directory where your pnp configuration file is located

You can disable this automatic and implicit logging configuration probing by starting pnp by passing --no-log-probe:

pnp --no-log-probe <pnp_configuration>

A simple logging configuration that will log severe errors to a separate rotating log file looks like this:

version: 1
disable_existing_loggers: False

formatters:
    simple:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

handlers:
    console:
        class: logging.StreamHandler
        level: DEBUG
        formatter: simple
        stream: ext://sys.stdout

    error_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: ERROR
        formatter: simple
        filename: errors.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

root:
    level: INFO
    handlers: [console, error_file_handler]

Payload unwrapping

Sometimes a pull returns a list of items or you created a list of readings by using the selector. As long as a push can process a list as an input everything is fine. But now imagine that you want to process each item of that list individually. Unwrapping to the rescue. Unwrapping will exactly allow you what was asked for above: Pass each item of a list to a push individually.

tasks:
  - name: unwrapping
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat:
          - 1
          - 2
          - 3
    push:
      - plugin: pnp.plugins.push.simple.Echo
        unwrap: true  # Magic!

But sometimes you want to add items to the list before processing it item by item. You can do this by using a push.simple.Nop properly.

tasks:
  - name: unwrapping
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat:
          - 1
          - 2
          - 3
    push:
      - plugin: pnp.plugins.push.simple.Nop
        # You can add items to a list ...
        selector: "data + [4, 5, 6]"
        # ... and not unwrap (which is the default) ...
        unwrap: false
        # ... and use unwrap: true in a dependant push
        deps:
          - plugin: pnp.plugins.push.simple.Echo
            unwrap: true

Retry handler

By using the explicit form of the engine specification you can specify a so called RetryHandler as well. A RetryHandler controls what happens if your push unexpectedly exits or raises an error.

NoRetryHandler

Will not retry ro run the pull if it fails. If all pulls exit for some reason pnp will exit as well.

engine: !engine
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.NoRetryHandler
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

SimpleRetryHandler

In addition to the NoRetryHandler this handler will retry after a specified amount of time configured by passing retry_wait (default is 60 seconds).

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.SimpleRetryHandler
    retry_wait: 60s  # Default is 60 seconds as well
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

LimitedRetryHandler

In addition to retrying the pull you can configure how many retries are allowed by passing max_retries (default is 3). Each failed pull execution will raise the counter and the counter will not reset.

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.LimitedRetryHandler
    retry_wait: 60s  # Default is 60 seconds as well
    # Retry to run the pull 3 times before giving up
    max_retries: 3
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

AdvancedRetryHandler

In addition to the LimitedRetryHandler the counter will be reset if the pull has ran successful for a specific amount of time by passing reset_retry_threshold (default is 60 seconds).

engine: !engine  # Use the AsyncEngine explicitly
  type: pnp.engines.AsyncEngine
  retry_handler: !retry
    type: pnp.engines.AdvancedRetryHandler
    retry_wait: 60s  # Default is 60 seconds as well
    # Retry to run the pull 3 times before giving up
    max_retries: 3
    # Will reset the retry count after 60 seconds
    # of successful execution of the pull
    reset_retry_threshold: 60s
tasks:
  - name: async
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo

Note

If no RetryHandler is explicitly specified the AdvancedRetryHandler will be used. The instance created will use the default values for it’s arguments.

Suppress push

Sometimes you want to suppress (do not execute) a push based on the actual incoming payload from a pull. To achieve this all you have to do is to emit the special sentinel SUPPRESS from your selector expression.

tasks:
  - name: selector
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Only print the counter if it is even. Suppress odd ones.
        selector: "payload if payload % 2 == 0 else SUPPRESS"

UDF Throttle

Consider the following situation: You have a selector that uses a udf to fetch a state from an external system.

The state won’t change so often, but your selector will fetch the state every time a pull transports a payload to the push. You want to decrease the load on the external system and you want to increase throughput.

Throttling to the rescue. Specifying throttle when instantiating your udf will manage that your udf will only call the external system once and cache the result. Subsequent calls will either return the cached result or call the external system again when a specified time has passed since the last call that actually fetched a result from the external system.

udfs:
  - name: count  # Instantiate a Counter user defined function
    plugin: pnp.plugins.udf.simple.Counter
    args:  # The presence of args tells pnp to instantiate a Counter - important because it has a state (the actual count)
      init: 1
      # Will only call the counter if 10 seconds passed between current call and last call.
      # In the meantime a cached result will be returned.
      throttle: 10s
tasks:
  - name: hello-world
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat: "Hello World"
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          counter: "lambda d: count()"

YAML Tags

You can use yaml tags to augment your yaml configuration. Right now the following tags are supported:

  • !include: To incorporate another yaml file into your configuration
  • !env: To inject a environment variable. Use := to denote a default value. The configuration loading process will fail if the environment variable is not set and no default is given.

The following example will demonstrate the yaml tags in more depth:

# Use !include to incorporate another yaml file into this one

tasks:
  - name: tags
    pull: !include _repeat.yaml
    push:
      - !include _echo.yaml
      - !include _nop.yaml
# Contents of _repeat.yaml
plugin: pnp.plugins.pull.simple.Repeat
args:
  wait: 1
  repeat: !env MESSAGE:=Hello World
# Contents of _echo.yaml
plugin: pnp.plugins.push.simple.Echo
# Contents of _nop.yaml
plugin: pnp.plugins.push.simple.Nop

Plugins

Pulls

Pulls can be divided into pulls that react on events and pulls that regularly poll for data.

So called Polling components are special pulls that - as stated earlier - regularly poll data or just execute in regular intervals.

Besides the arguments stated in the component description polls always have the following arguments to control their polling behavior.

name type opt. default description
interval str/float yes 60s You may specify duration literals such as 60 (60 secs), 1m, 1h (…) to realize a periodic polling or cron expressions e.g. */1 * * * * (every minute) to realize cron like behavior.
instant_run bool yes False If set to True the component will run as soon as pnp starts; otherwise it will run the next configured interval.

fitbit.Current

plugin type extra version
pnp.plugins.pull.fitbit.Current poll fitbit 0.13.0

Description

Requests various current metrics (steps, calories, distance, …) from the fitbit api for a specific account.

Please see Fitbit Authentication to configure to prepare your account accordingly.

Arguments

name type opt. default description
config str no n/a The configuration file that keeps your initial and refreshed authentication tokens (see Fitbit Authentication for detailed information)
resources List[str] no n/a The resources to request (see below for possible options)
system str yes None The metric system to use based on your localisation (de_DE, en_US, …). Default is your configured metric system in your fitbit account

Note

You can query the following resources:

  • activities/calories
  • activities/caloriesBMR
  • activities/steps
  • activities/distance
  • activities/floors
  • activities/elevation
  • activities/minutesSedentary
  • activities/minutesLightlyActive
  • activities/minutesFairlyActive
  • activities/minutesVeryActive
  • activities/activityCalories
  • body/bmi
  • body/fat
  • body/weight
  • foods/log/caloriesIn
  • foods/log/water
  • sleep/awakeningsCount
  • sleep/efficiency
  • sleep/minutesAfterWakeup
  • sleep/minutesAsleep
  • sleep/minutesAwake
  • sleep/minutesToFallAsleep
  • sleep/startTime
  • sleep/timeInBed

Result

Emits a map that contains the requested resources and their associated values:

{
  "activities/calories": 1216,
  "activities/caloriesBMR": 781,
  "activities/steps": 4048,
  "activities/distance": 3.02385,
  "activities/floors": 4,
  "activities/elevation": 12,
  "activities/minutes_sedentary": 127,
  "activities/minutes_lightly_active": 61,
  "activities/minutes_fairly_active": 8,
  "activities/minutes_very_active": 24,
  "activities/activity_calories": 484,
  "body/bmi": 23.086421966552734,
  "body/fat": 0.0,
  "body/weight": 74.8,
  "foods/log/calories_in": 0,
  "foods/log/water": 0.0,
  "sleep/awakenings_count": 0,
  "sleep/efficiency": 84,
  "sleep/minutes_after_wakeup": 0,
  "sleep/minutes_asleep": 369,
  "sleep/minutes_awake": 69,
  "sleep/minutes_to_fall_asleep": 0,
  "sleep/start_time": "21:50",
  "sleep/time_in_bed": 438
}

Example

# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration
- name: fitbit_current
  pull:
    plugin: pnp.plugins.pull.fitbit.Current
    args:
      config: !env FITBIT_AUTH
      instant_run: true
      interval: 5m
      resources:
        - 'activities/calories'
        - 'activities/caloriesBMR'
        - 'activities/steps'
        - 'activities/distance'
        - 'activities/floors'
        - 'activities/elevation'
        - 'activities/minutesSedentary'
        - 'activities/minutesLightlyActive'
        - 'activities/minutesFairlyActive'
        - 'activities/minutesVeryActive'
        - 'activities/activityCalories'
        - 'body/bmi'
        - 'body/fat'
        - 'body/weight'
        - 'foods/log/caloriesIn'
        - 'foods/log/water'
        - 'sleep/awakeningsCount'
        - 'sleep/efficiency'
        - 'sleep/minutesAfterWakeup'
        - 'sleep/minutesAsleep'
        - 'sleep/minutesAwake'
        - 'sleep/minutesToFallAsleep'
        - 'sleep/startTime'
        - 'sleep/timeInBed'
  push:
    - plugin: pnp.plugins.push.simple.Echo

fitbit.Devices

plugin type extra version
pnp.plugins.pull.fitbit.Devices poll fitbit 0.13.0

Description

Requests details about your fitbit devices / trackers (battery, model, …) associated to your account.

Please see Fitbit Authentication to configure to prepare your account accordingly.

Arguments

name type opt. default description
config str no n/a The configuration file that keeps your initial and refreshed authentication tokens (see Fitbit Authentication for detailed information)
system str yes None The metric system to use based on your localisation (de_DE, en_US, …). Default is your configured metric system in your fitbit account

Result

Emits a list that contains your available trackers and/or devices and their associated details:

[{
  "battery": "Empty",
  "battery_level": 10,
  "device_version": "Charge 2",
  "features": [],
  "id": "abc",
  "last_sync_time": "2018-12-23T10:47:40.000",
  "mac": "AAAAAAAAAAAA",
  "type": "TRACKER"
}, {
  "battery": "High",
  "battery_level": 95,
  "device_version": "Blaze",
  "features": [],
  "id": "xyz",
  "last_sync_time": "2019-01-02T10:48:39.000",
  "mac": "FFFFFFFFFFFF",
  "type": "TRACKER"
}]

Example

# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration

tasks:
  - name: fitbit_devices
    pull:
      plugin: pnp.plugins.pull.fitbit.Devices
      args:
        config: !env FITBIT_AUTH
        instant_run: true
        interval: 15m
    push:
      - plugin: pnp.plugins.push.simple.Echo

fitbit.Goal

plugin type extra version
pnp.plugins.pull.fitbit.Goal poll fitbit 0.13.0

Description

Requests your goals (water, steps, …) from the fitbit api.

Please see Fitbit Authentication to configure to prepare your account accordingly.

Arguments

name type opt. default description
config str no n/a The configuration file that keeps your initial and refreshed authentication tokens (see Fitbit Authentication for detailed information)
resources List[str] no n/a The resources to request (see below for possible options)
system str yes None The goals to request (see below for detailed information)

Note

You can query the following resources:

  • body/fat
  • body/weight
  • activities/daily/activeMinutes
  • activities/daily/caloriesOut
  • activities/daily/distance
  • activities/daily/floors
  • activities/daily/steps
  • activities/weekly/distance
  • activities/weekly/floors
  • activities/weekly/steps
  • foods/calories
  • foods/water

Result

Emits a map structure that consists of the requested goals:

{
  "body/fat": 15.0,
  "body/weight": 70.0,
  "activities/daily/active_minutes": 30,
  "activities/daily/calories_out": 2100,
  "activities/daily/distance": 5.0,
  "activities/daily/floors": 10,
  "activities/daily/steps": 6000,
  "activities/weekly/distance": 5.0,
  "activities/weekly/floors": 10.0,
  "activities/weekly/steps": 6000.0,
  "foods/calories": 2220,
  "foods/water": 1893
}

Example

# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration

tasks:
  - name: fitbit_goal
    pull:
      plugin: pnp.plugins.pull.fitbit.Goal
      args:
        config: !env FITBIT_AUTH
        instant_run: true
        interval: 5m
        goals:
          - body/fat
          - body/weight
          - activities/daily/activeMinutes
          - activities/daily/caloriesOut
          - activities/daily/distance
          - activities/daily/floors
          - activities/daily/steps
          - activities/weekly/distance
          - activities/weekly/floors
          - activities/weekly/steps
          - foods/calories
          - foods/water
    push:
      - plugin: pnp.plugins.push.simple.Echo

fs.FileSystemWatcher

plugin type extra version
pnp.plugins.pull.fs.FileSystemWatcher pull fswatcher < 0.10.0

Description

Watches the given directory for changes like created, moved, modified and deleted files.

Per default will recursively report any file that is touched, changed or deleted in the given path. The directory itself or subdirectories will be object to reporting too, if ignore_directories is set to False.

Arguments

name type opt. default description
path str no n/a The path to track for file / directory changes
recursive bool yes True If set to True, any subfolders of the given path will be tracked too.
patterns List[str] yes None Any file pattern (e.g. *.txt or *.txt, *.md. If set to None no filter is applied.
ignore_patterns List[str] yes None Any patterns to ignore (specify like argument patterns). If set to None, nothing will be ignored.
ignore_directories bool yes False If set to True will send events for directories when file change.
case_sensitive bool yes False If set to True, any pattern is case_sensitive, otherwise it is case insensitive.
events List[str] yes None The events to track. One or multiple of ‘moved’, ‘deleted’, ‘created’ and/or ‘modified’. If set to None all events will be reported.
load_file bool yes False If set to True the file contents will be loaded into the result.
mode str yes auto Open mode of the file (only necessary when load_file is True). Can be text, binary or auto (guessing).
base64 bool yes False If set to True the loaded file contents will be converted to base64 (only applicable when load_file is True). Argument mode will be automatically set to ‘binary’
defer_modified float yes 0.5 There might be multiple flushes of a file before it is written completely to disk. Without defer_modified each flush will raise a modified event.

Result

Example of an emitted message:

{
    "operation": "modified",
    "source": "/tmp/abc.txt",
    "is_directory": False,
    "destination": None,  # Only non-None when operation = "moved"
    "file": {  # Only present when load_file is True
        "file_name": "abc.txt",
        "content": "foo and bar",
        "read_mode": "text",
        "base64": False
    }
}

Example

tasks:
  - name: file_watcher
    pull:
      plugin: pnp.plugins.pull.fs.FileSystemWatcher
      args:
        path: "/tmp"
        ignore_directories: true
        events: [created, deleted, modified]
        load_file: false
    push:
      plugin: pnp.plugins.push.simple.Echo

fs.Size

plugin type extra version
pnp.plugins.pull.fs.Size poll none 0.17.0

Description

Periodically determines the size of the specified files or directories in bytes.

Arguments

name type opt. default description
paths List[str] no n/a List of files and/or directories to monitor their sizes in bytes.
fail_on_error bool yes True If set to true, the plugin will raise an error when a file/directory does not exists or any other file system related error occurs. Otherwise the plugin will proceed and simply report None as size.

Note

Be careful when adding directories with a large amount of files. This will be prettly slow cause the plugin will iterate over each file and determine it’s individual size.

Result

Example of an emitted message. Size is in bytes.

{
  "logs": 32899586,
  "copy": 28912
}

Example

tasks:
  - name: file_size
    pull:
      plugin: pnp.plugins.pull.fs.Size
      args:
        instant_run: true
        interval: 5s
        fail_on_error: false
        paths:
          logs: /var/log  # directory - recursively determines size
          copy: /bin/cp  # file
    push:
      plugin: pnp.plugins.push.simple.Echo

ftp.Server

plugin type extra version
pnp.plugins.pull.ftp.Server pull ftp 0.17.0

Description

Runs a ftp server on the specified port to receive and send files by ftp protocol.

Optionally sets up a simple user/password authentication mechanism.

Arguments

name type opt. default description
directory str yes None The directory to serve via ftp protocol. If not given a directory is created is created temporarily to accept incoming uploads.
port int yes 2121 The port to listen on.
user_pwd str / Tuple[str] yes None User/password combination (as a tuple/list; see example). You may specify the user only - password will be empty OR you can enable anonymous access by not providing the argument.
events List[str] yes None A list of events to subscribe to. Available events are: connect, disconnect, login, logout, file_received, file_sent, file_received_incomplete, file_sent_incomplete. By default all events are subscribed.
max_cons int yes 256 The maximum number of simultaneous connections the ftpserver will permit.
max_cons_ip int yes 5 The maximum number of simultaneous connections from the same ip. Default is 5.

Result

All emitted messages will have an event field to identify the type of the event and an - optional - data field.

The data field will contain the user for login/logout events and the file_path for file-related events.

{
  "event": "file_received",
  "data": {
    "file_path": "/private/tmp/ftp/test.txt"
  }
}

Example

tasks:
  - name: ftp_server
    pull:
      plugin: pnp.plugins.pull.ftp.Server
      args:
        directory: !env FTP_DIR
        user_pwd: [admin, root]  # user: admin, pw: root
        events:
          - file_received
          - file_sent
    push:
      - plugin: pnp.plugins.push.simple.Echo

gpio.Watcher

plugin type extra version
pnp.plugins.pull.gpio.Watcher poll gpio 0.12.0

Description

Listens for low/high state changes on the configured gpio pins.

In more detail the plugin can raise events when one of the following situations occur:

  • rising (high) of a gpio pin - multiple events may occur in a short period of time
  • falling (low) of a gpio pin - multiple events may occur in a short period of time
  • switch of gpio pin - will suppress multiple events a defined period of time (bounce time)
  • motion of gpio pin - will raise the event motion_on if the pin rises and set a timer with a configurable amount of time. Any other gpio rising events will reset the timer. When the timer expires the motion_off event is raised.

Arguments

name type opt. default description
pins List[int] no n/a The gpio pins to observe for state changes. Please see the examples section on how to configure it.
default str no n/a The default edge that is applied when not configured. Please see the examples section for further details. One of rising, falling, switch, motion

Result

Emits a dictionary that contains an entry for every sensor of the plant sensor device:

{
  "gpio_pin": 17  # The gpio pin which state has changed
  "event": rising  # One of [rising, falling, switch, motion_on, motion_off]
}

Example

tasks:
  - name: gpio
    pull:
      plugin: pnp.plugins.pull.gpio.Watcher
      args:
        default: rising
        pins:
          - 2               # No mode specified: Default mode (in this case 'rising')
          - 2               # Duplicates will get ignored
          - 3:rising        # Equal to '3' (without explicit mode)
          - 3:falling       # Get the falling event for gpio pin 3 as well
          - 4:switch        # Uses some debouncing magic and emits only one rising event
          - 5:switch(1000)  # Specify debounce in millseconds (default is 500ms)
          - 5:switch(500)   # Duplicates - even when they have other arguments - will get ignored
          - 7:motion        # Uses some delay magic to emit only one motion on and one motion off event
          - 9:motion(1m)    # Specify delay (default is 30 seconds)
    push:
      - plugin: pnp.plugins.push.simple.Echo

hass.State

plugin type extra version
pnp.plugins.pull.hass.State pull none 0.14.0

Description

Connects to the home assistant websocket api and listens for state changes. If no include or exclude is defined it will report all state changes. If include is defined only entities that match one of the specified patterns will be emitted. If exclude if defined entities that match at least one of the specified patterns will be ignored. exclude patterns overrides include patterns.

Arguments

name type opt. default description
host str no n/a Url to your home assistant instance (e.g. http://my-hass:8123)
token str no n/a Your long lived access token to access the websocket api. See below for further instructions
include List[str] yes n/a Patterns of entity state changes to include. All state changes that do not match the defined patterns will be ignored
exclude List[str] yes n/a Patterns of entity state changes to exclude. All state changes that do match the defined patterns will be ignored

Note

  • include and exclude support wildcards (e.g * and ?)
  • exclude overrides include. So you can include everything from a domain (sensor.*) but exclude individual entities.
  • Create a long lived access token: Home Assistant documentation

Result

The emitted result always contains the entity_id, new_state and old_state:

{
  "entity_id": "light.bedroom_lamp",
  "old_state": {
    "state": "off",
    "attributes": {},
    "last_changed": "2019-01-08T18:24:42.087195+00:00",
    "last_updated": "2019-01-08T18:40:40.011459+00:00"
  },
  "new_state": {
    "state": "on",
    "attributes": {},
    "last_changed": "2019-01-08T18:41:06.329699+00:00",
    "last_updated": "2019-01-08T18:41:06.329699+00:00"
  }
}

Example

tasks:
  - name: hass_state
    pull:
      plugin: pnp.plugins.pull.hass.State
      args:
        url: http://localhost:8123
        token: !env HA_TOKEN
        exclude:
          - light.lamp
        include:
          - light.*
    push:
      - plugin: pnp.plugins.push.simple.Echo

http.Server

plugin type extra version
pnp.plugins.pull.http.Server pull none < 0.10.0

Description

Creates a specific route on the builtin api server and listens to any call to that route. Any data passed to the endpoint will be tried to be parsed to a dictionary (json). If this is not possible the data will be passed as is. See sections Result for specific payload and examples.

Arguments

name type opt. default description
prefix_path str no n/a The route to create for incoming traffic on the builtin api server. See the Example section for reference.
allowed_methods List[str] yes GET List of http methods that are allowed. Default is ‘GET’.

Result

Assumes that you configured your pull with prefix_path = callme

curl -X GET "http://localhost:9999/callme/telephone/now?number=12345&priority=high" --data '{"magic": 42}'
{
  "endpoint": "telephone/now",
  "data": {"magic": 42},
  "levels": ["telephone", "now"],
  "method": "GET",
  "query": {"number": "12345", "priority": "high"},
  "is_json": True,
  "url": "http://localhost:9999/callme/telephone/now?number=12345&priority=high",
  "full_path": "/callme/telephone/now?number=12345&priority=high",
  "path": "/callme/telephone/now"
}

Example

#
# Registers the endpoint /callme to the builtin api server.
# Use curl to try it out:
#   curl -X GET "http://localhost:9999/callme/telephone/now?number=12345&priority=high" --data '{"magic": 42}'
#

api:  # You need to enable the api
  port: 9999  # Mandatory
tasks:
  - name: server
    pull:
      plugin: pnp.plugins.pull.http.Server
      args:
        prefix_path: callme  # Results into http://localhost:9999/callme
        allowed_methods:  # Specify which methods are allowed
          - GET
          - POST
    push:
      plugin: pnp.plugins.push.simple.Echo

monitor.Stats

plugin type extra version
pnp.plugins.pull.monitor.Stats poll none 0.12.0

Description

Emits every interval various metrics / statistics about the host system. Please see the Result section for available metrics.

Result

Emits a dictionary that contains an entry for every sensor of the plant sensor device:

{
  "cpu_count": 4,
  "cpu_freq": 2700,
  "cpu_temp": 0.0,
  "cpu_use": 80.0,
  "disk_use": 75.1,
  "load_1m": 2.01171875,
  "load_5m": 1.89501953125,
  "load_15m": 1.94189453125,
  "memory_use": 67.0,
  "rpi_cpu_freq_capped": 0,
  "rpi_temp_limit_throttle": 0,
  "rpi_throttle": 0,
  "rpi_under_voltage": 0,
  "swap_use": 36.1
}

Example

tasks:
  - name: stats
    pull:
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        interval: 10s
        instant_run: true
    push:
      plugin: pnp.plugins.push.simple.Echo

mqtt.Subscribe

plugin type extra version
pnp.plugins.pull.mqtt.Subscribe pull none < 0.10.0

Description

Pulls messages from the specified topic from the given mosquitto mqtt broker (identified by host and port).

Arguments

name type opt. default description
host str no n/a Host where the mosquitto broker is running.
port int no n/a Port where the mosquitto broker is listening.
topic str no n/a Topic to listen for new messages. You can listen to multiple topics by using the #-wildcard (e.g. test/# will listen to all topics underneath test).

Result

The emitted message will look like this:

{
  "topic": "test/device/device1",
  "levels": ["test", "device", "device1"]
  "payload": "The actual event message"
}

Example

tasks:
  - name: mqtt
    pull:
      plugin: pnp.plugins.pull.mqtt.Subscribe
      args:
        host: localhost
        port: 1883
        topic: test/#
    push:
      plugin: pnp.plugins.push.simple.Echo

net.PortProbe

plugin type extra version
pnp.plugins.pull.net.PortProbe poll none 0.19.0

Description

Periodically establishes socket connection to check if anybody is listening on a given server on a specific port.

Arguments

name type opt. default description
port int no n/a The port to probe if a service is listening
server str yes localhost Server name or ip address
timeout float yes 1.0 Timeout for remote operations

Result

Emits a dictionary that contains an entry for every sensor of the plant sensor device:

{
  "server": "www.google.de",
  "port": 80,
  "reachable": True
}

Example

tasks:
  - name: port_probe
    pull:
      plugin: pnp.plugins.pull.net.PortProbe
      args:
        server: localhost  # Server name or ip address, default is localhost
        port: 9999  # The port to probe if somebody is listening
        interval: 5s  # Probe the port every five seconds ...
        instant_run: true  # ... and run as soon as pnp starts
    push:
      - plugin: pnp.plugins.push.simple.Echo

net.Speedtest

plugin type extra version
pnp.plugins.pull.net.Speedtest poll speedtest 0.25.0

Description

Performs a speedtest of your internet connection using speedtest.net

Arguments

No arguments

Result

{
  "download_speed_bps": 13815345.098080076,
  "download_speed_mbps": 13.82,
  "upload_speed_bps": 1633087.176468341,
  "upload_speed_mbps": 1.63,
  "ping_latency": 19.933,
  "result_image": "http://www.speedtest.net/result/10049630297.png",
  "server": {
    "name": "Deutsche Telekom",
    "host": "ham.wsqm.telekom-dienste.de:8080",
    "location": {
      "city": "Hamburg",
      "country": "Germany",
      "lat": "53.5653",
      "lon": "10.0014"
    }
  },
  "client": {
    "isp": "Vodafone DSL",
    "rating": "3.7"
  }
}

Example

tasks:
  - name: speedtest
    pull:
      plugin: pnp.plugins.pull.net.Speedtest
      args:
        num_parallel_requests: 2  # Number of parallel requests
        interval: 1h  # Run every hour
        instant_run: true  # Run as soon as pnp starts
    push:
      - plugin: pnp.plugins.push.simple.Echo

net.SSLVerify

plugin type extra version
pnp.plugins.pull.net.SSLVerify poll none 0.22.0

Description

Periodically checks if the ssl certificate of a given host is valid and how many days are remaining before the certificate will expire.

Arguments

name type opt. default description
host str no n/a The host to check for it’s SSL certificate.
timeout float yes 3.0 Timeout for remote operation.

Result

{
  # Envelope
  "host": "www.google.com",
  "payload": {
    "expires_days": 50,  # Remaining days before expiration
    "expires_at": datetime.datetime(2020, 5, 26, 9, 45, 52),  # Python datetime of expiration
    "expired": False  # True of the certificate is expired; otherwise False.
  }
}

Example

tasks:
  - name: ssl_verify
    pull:
      plugin: pnp.plugins.pull.net.SSLVerify
      args:
        host: www.google.com  # Check the ssl certificate for this host
        interval: 1m  # Check the ssl certificate every minute
        instant_run: true  # ... and run as soon as pnp starts
    push:
      - plugin: pnp.plugins.push.simple.Echo

presence.FritzBoxTracker

plugin type extra version
pnp.plugins.pull.presence.FritzBoxTracker poll fritz 0.22.0

Description

Periodically asks a Fritz!Box router for the devices that were connected in the past or right now.

Note

Extra fritz is only compatible with python 3.6 or higher

Arguments

name type opt. default description
host str yes 169.254.1.1 The IP address of your Fritz!Box.
user str yes admin The user to use.
password str yes <empty> The password to use.
offline_delay int yes 0 Defines how many intervals to wait before marking a device as not connected after the Fritz!Box reported the device as not connected anymore. This is useful for mobile devices that go temporarily to sleep and drop connection. Default is 0 -> Disconnected devices will be instantly reported as disconnected.
whitelist List[str] yes None A specific list of devices to track (identified by mac address). If not passed all devices will be fetched.

Note

By using the default values you should be able to connect to your Fritz!Box, because the necessary operation can be performed anonymously.

Result

{
  "ip": "192.168.178.2",
  "mac": "00:0a:95:9d:68:16",
  "status": True,  # True or False
  "name": "pc1"
}

Example

tasks:
  - name: fritzbox_tracker
    pull:
      plugin: pnp.plugins.pull.presence.FritzBoxTracker
      args:
        host: 169.254.1.1  # IP of your Fritz!Box. Default is 169.254.1.1
        user: admin  # User name. Default is admin
        password: ''  # Password. Default is an empty string
        offline_delay: 0  # How many intervals to wait before marking a device as not connected after the fritzbox reported so
        instant_run: true  # ... and run as soon as pnp starts
    push:
      - plugin: pnp.plugins.push.simple.Echo
tasks:
  - name: fritzbox_tracker_whitelist
    pull:
      plugin: pnp.plugins.pull.presence.FritzBoxTracker
      args:
        host: 169.254.1.1  # IP of your Fritz!Box. Default is 169.254.1.1
        user: admin  # User name. Default is admin
        password: ''  # Password. Default is an empty string
        offline_delay: 0  # How many intervals to wait before marking a device as not connected after the fritzbox reported so
        whitelist:  # A specific list of devices to track (identified by mac address)
          - B0:05:94:77:B8:3B
          - 90:CD:B6:DC:8D:61
        instant_run: true  # ... and run as soon as pnp starts
    push:
      - plugin: pnp.plugins.push.simple.Echo

sensor.DHT

plugin type extra version
pnp.plugins.pull.sensor.DHT poll dht < 0.10.0

Description

Periodically polls a dht11 or dht22 (aka am2302) for temperature and humidity readings. Polling interval is controlled by interval.

Arguments

name type opt. default description
device str yes dht22 The device to poll (one of dht22, dht11, am2302).
data_gpio int yes 17 The data gpio port where the device operates on.
humidity_offset float yes 0.0 Positive/Negative offset for humidity.
temp_offset float yes 0.0 Positive/Negative offset for temperature.

Result

{
  "humidity": 65.4  # in %
  "temperature": 23.7  # in celsius
}

Example

tasks:
  - name: dht
    pull:
      plugin: pnp.plugins.pull.sensor.DHT
      args:
        device: dht22  # Connect to a dht22
        data_gpio: 17  # DHT is connected to gpio port 17
        interval: 5m  # Polls the readings every 5 minutes
        humidity_offset: -5.0  # Subtracts 5% from the humidity reading
        temp_offset: 1.0  # Adds 1 °C to the temperature reading
        instant_run: true
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector: payload.temperature  # Temperature reading
      - plugin: pnp.plugins.push.simple.Echo
        selector: payload.humidity  # Humidity reading

sensor.MiFlora

plugin type extra version
pnp.plugins.pull.sensor.MiFlora poll miflora 0.16.0

Description

Periodically polls a Xiaomi MiFlora plant sensor for sensor readings (temperature, conductivity, light, …) via btle

Arguments

name type opt. default description
mac str no n/a

The device to poll identified by mac address.

See below for further instructions

adapter str yes hci0

The bluetooth adapter to use (if you have more than one).

Default is hci0 which is your default bluetooth device.

Note

Start a bluetooth scan to determine the MAC addresses of the sensor (look for Flower care or Flower mate entries) using this command:

$ sudo hcitool lescan
LE Scan ...
F8:04:33:AF:AB:A2 [TV] UE48JU6580
C4:D3:8C:12:4C:57 Flower mate
[...]

Result

Emits a dictionary that contains an entry for every sensor of the plant sensor device:

{
  "conductivity": 800,
  "light": 2000,
  "moisture": 42,
  "battery": 72,
  "temperature": 24.2,
  "firmaware": "3.1.9"
}

Example

- name: miflora
  pull:
    plugin: pnp.plugins.pull.sensor.MiFlora
    args:
      mac: 'C4:7C:8D:67:50:AB'  # The mac of your miflora device
      instant_run: true
  push:
    - plugin: pnp.plugins.push.simple.Echo

sensor.Sound

plugin type extra version
pnp.plugins.pull.sensor.Sound pull sound 0.15.0

Description

Listens to the microphone in realtime and searches the stream for specific sound patterns.

Practical example: I use this plugin to recognize my doorbell without tampering with the electrical device ;-)

Arguments

name type opt. default description
wav_files List[Dict] no n/a See below for a detailed description
device_index int yes None The index of the microphone device. Run pnp_record_sound --list to get the index. If not specified pyAudio will try to find a capable device
ignore_overflow bool yes true If set to True any buffer overflows due to slow realtime processing will be ignored. Otherwise an exception will be thrown and the plugin will abort.
  • wav_files

    A list of dictionaries containing the configuration for each file that contains an original sound pattern to listen for. Possible keys:

    name type opt. default description
    path str no n/a The path to the original sound file. Absolute or relative to the pnp configuration file
    mode str yes pearson Correlation/similarity method. Default is pearson. Try out which one is best for you
    offset float yes 0.0 Adjusts sensitivity for similarity. Positive means less sensitive; negative is more sensitive. You should try out 0.1 steps
    cooldown Dict yes special See below for a detailed description
  • cooldown

    Contains the cooldown configuration. Default is a cooldown period of 10 seconds and no emit of a cooldown event. Possible keys:

    name type opt. default description
    period str yes 10s Prevents the pull to emit more than one sound detection event per cool down period.
    emit_event bool yes false If set to true the end of the cooldown period will an emit as well.

Note

  • You can list your available input devices: pnp_record_sound --list
  • You can record a wav file from an input device: pnp_record_sound <out.wav> --seconds=<seconds_to_record> --index=<idx>
  • This one is _not_ pre-installed when using the docker image. Would be grateful if anyone can integrate it

Result

Will emit the event below when the correlation coefficient is above or equal the threshold. In this case the component has detected a sound that is similar to one of the given sound patterns

{
  "type": "sound"  # Type 'sound' means we detected a sound pattern
  "sound": ding,  # Name of the wav_file without path and extension. To differentiate if you have multiple patterns you listen to
  "corrcoef": 0.82,  # Correlation coefficient probably between [-1;+1] for pearson
  "threshold": 0.6  # Threshold influenced by sensitivity_offset
}

Will emit the event below when you have configured the component to send cooldown events as well.

{
  "type": "cooldown"  # Type 'cooldown' means that we previously identified a sound pattern and the cooldown has happened
  "sound": ding,  # Name of the wav_file without path and extension. To differentiate if you have multiple patterns you listen to
}

Example

tasks:
  - name: sound_detector
    pull:
      plugin: pnp.plugins.pull.sensor.Sound
      args:
        wav_files:  # The files to compare for similarity
          - path: ding.wav  # Absolute or relative (from the config) path to the wav file
            mode: std  # Use std correlation coefficient [pearson, std]; optional default is pearson
            offset: -0.5  # Adjust sensitivity. Positive means less sensitive; negative is more sensitive. Default is 0.0
          - path: doorbell.wav  # This will use default values for mode and offset (pearson, 0.0)
            cooldown:
              period: 10s  # Prevents the pull to emit more than one sound detection event every 10 seconds
              emit_event: true  # Fire an event after the actual cool down - Useful for binary_sensors to return to their 'off' state
        device_index:  # The index of the microphone devices. If not specified pyAudio will try to find a capable device
        ignore_overflow: true  # Some devices might be too slow to process the stream in realtime. Ignore any buffer overflow errors.
    push:
      - plugin: pnp.plugins.push.simple.Echo

simple.Count

plugin type extra version
pnp.plugins.pull.simple.Count poll none < 0.10.0

Description

Emits every interval seconds a counting value which runs from from_cnt to to_cnt. If to_cnt is None the counter will count to infinity (or more precise to sys.maxsize).

Arguments

name type opt. default description
from_cnt int yes 0 Starting value of the counter.
to_cnt int yes sys.maxsize End value of the counter. If not passed set to “infinity” (precise: sys.maxsize)

Result

Counter value (int).

Example

tasks:
  - name: count
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
        from_cnt: 1
        to_cnt: 10
    push:
      plugin: pnp.plugins.push.simple.Echo

simple.Cron

plugin type extra version
pnp.plugins.pull.simple.Cron pull none 0.16.0

Description

Execute push-components based on time constraints configured by cron-like expressions.

This plugin basically wraps cronex to parse cron expressions and to check if any job is pending. See the documentation of cronex for a guide on featured/supported cron expressions.

Arguments

name type opt. default description
exppresions List[str] no n/a Cron like expressions to configure the scheduler.

Result

Imagine your cron expressions looks like this: */1 * * * * every minute. The pull will emit the text every minute every minute.

Example

tasks:
  - name: cron
    pull:
      plugin: pnp.plugins.pull.simple.Cron
      args:
        expressions:
          - "*/1 * * * * every minute"
          - "0 15 * * * 3pm"
          - "0 0 * * * midnight every day"
          - "0 16 * * 1-5 every weekday @ 4pm"
    push:
      plugin: pnp.plugins.push.simple.Echo

simple.Repeat

plugin type extra version
pnp.plugins.pull.simple.Repeat poll none < 0.10.0

Description

Emits every interval seconds the same repeat.

Arguments

name type opt. default description
repeat Any no n/a The object to emit.

Result

Emits the repeat-object as it is.

Example

tasks:
  - name: repeat
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        repeat: "Hello World"  # Repeats 'Hello World'
        interval: 1s  # Every second
    push:
      plugin: pnp.plugins.push.simple.Echo

simple.RunOnce

plugin type extra version
pnp.plugins.pull.simple.RunOnce pull none 0.23.0

Description

Takes a valid plugins.pull.Polling component and immediately executes it and ventures down the given plugins.push components. If no component to wrap is given it will simple execute the push chain.

Arguments

name type opt. default description
poll plugin yes None The polling component you want to run once. If not passed the push chain will be executed.

Result

Emits the payload of the polling component if given. Otherwise an empty dictionary will be returned.

Example

tasks:
  - name: run_once
    pull:
      plugin: pnp.plugins.pull.simple.RunOnce
    push:
      plugin: pnp.plugins.push.simple.Echo
tasks:
  - name: run_once_wrapped
    pull:
      plugin: pnp.plugins.pull.simple.RunOnce
      args:
        poll:
          plugin: pnp.plugins.pull.monitor.Stats
    push:
      plugin: pnp.plugins.push.simple.Echo

zway.ZwayPoll

plugin type extra version
pnp.plugins.pull.zway.ZwayPoll poll none < 0.10.0

Description

Pulls the specified json content from the zway rest api. The content is specified by the url, e.g. http://<host>:8083/ZWaveAPI/Run/devices will pull all devices and serve the result as a json.

Specify the polling interval by setting the argument interval. User / password combination is required when your api is protected against guest access (by default it is).

Use multiple pushes and the related selectors to extract the required content like temperature readings (see the examples section for guidance).

Arguments

name type opt. default description
url str no n/a The url to poll periodically.
user str no n/a Authentication user name.
password str no n/a Authentication password.

Note

Below are some common selector examples to fetch various metrics from various devices

Fibaro Motion Sensor

  • Temperature payload[deviceid].instances[0].commandClasses[49].data[1].val.value
  • Luminescence payload[deviceid].instances[0].commandClasses[49].data[3].val.value

Fibaro Wallplug

  • Meter payload[deviceid].instances[0].commandClasses[50].data[0].val.value

Thermostat (Danfoss / other should work as well)

  • Setpoint payload[deviceid].instances[0].commandClasses[67].data[1].val.value

Battery operated devices

  • Battery level payload[deviceid].instances[0].commandClasses[128].data.last.value

Result

Emits the content of the fetched url as it is.

Example

# Please make sure to adjust url and device ids
# Username and Password are injected from environment variables:
#     export ZWAY_USER=admin
#     export ZWAY_PASSWORD=secret_one
tasks:
  - name: zway
    pull:
      plugin: pnp.plugins.pull.zway.ZwayPoll
      args:
        url: "http://smarthome:8083/ZWaveAPI/Run/devices"
        interval: 5s
        user: !env ZWAY_USER
        password: !env ZWAY_PASSWORD
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Temperature of fibaro motion sensor
        # You can access the returned json like you would inquire the zway-api
        selector: payload[19].instances[0].commandClasses[49].data[1].val.value
      - plugin: pnp.plugins.push.simple.Echo
        # Luminiscence of fibaro motion sensor
        selector: payload[19].instances[0].commandClasses[49].data[3].val.value

Pushes

Like a pull a push does support args to initialize the instance of a push. Besides that you can optionally pass a selector to transform the incoming payload and set the unwrap option to invoke a push for each element of an iterable.

See also

Some pushes do support the envelope feature to alter the arguments for a push during runtime: Envelope

fs.FileDump

plugin type extra version
pnp.plugins.push.fs.FileDump push none < 0.10.0

Description

This push dumps the given payload to a file to the specified directory. If argument file_name is None, a name will be generated based on the current datetime (%Y%m%d-%H%M%S). If file_name is not passed (or None) you should pass extension to specify the extension of the generated file name. Argument binary_mode controls whether the dump is binary (mode=wb) or text (mode=w).

Arguments

name type opt. default env description
directory str yes . (cwd) no The target directory to store the dumps.
file_name str yes None yes The name of the file to dump. If not passed a file name will be automatically generated.
extension str yes .dump yes The extension to use when the file name is automatically generated.
binary_mode bool yes False no If set to True the file will be written in binary mode (wb); otherwise in text mode (w).

Result

Will return an absolute path to the file created.

Example

tasks:
  - name: file_dump
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        repeat: "Hello World"
    push:
      plugin: pnp.plugins.push.fs.FileDump
      args:
        directory: !env WATCH_DIR
        file_name: null  # Auto-generated file (timestamp)
        extension: ".txt"  # Extension of auto-generated file
        binary_mode: false  # text mode
      deps:
        - plugin: pnp.plugins.push.simple.Echo
tasks:
  - name: file_dump
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        repeat: "Hello World"
    push:
      plugin: pnp.plugins.push.fs.FileDump
      # Override `file_name` and `extension` via envelope.
      # Instead of an auto generated file, the file '/tmp/hello-world.hello' will be dumped.
      selector:
        data: "lambda data: data"
        file_name: hello-world
        extension: .hello
      args:
        directory: !env WATCH_DIR
        file_name: null  # Auto-generated file (timestamp)
        extension: ".txt"  # Extension of auto-generated file
        binary_mode: false  # text mode
      deps:
        - plugin: pnp.plugins.push.simple.Echo

fs.Zipper

plugin type extra version
pnp.plugins.push.fs.Zipper push none 0.21.0

Description

The push expects a directory or a file path to be passed as the payload. As long it’s a valid path it will zip the directory or the single file and return the absolute path to the created zip file.

Note

You can use a so called .zipignore file to exclude files and directories from zipping. It works - mostly - like a .gitignore file. To use a .zipignore file you have to put it in the root the folder you want to zip. An example .zipignore looks like this:

__pycache__/
*.log

This example will ignore all folder called __pycache__ and all files with the extension .log

Arguments

name type opt. default env description
source str yes n/a yes Specifies the source directory or file to zip. If not passed the source can be specified by the envelope at runtime.
out_path str yes tmp no Specifies the path to the general output path where all target zip files should be generated. If not passed the systems temp directory is used.
archive_name str yes below yes Explicitly specifies the name of the resulting archive.

The default of archive_name will be either the original file name (if you zip a single file) resp. the name of the zipped directory (if you zip a directory). In both cases the extension .zip will be added. If you do not want an extension, you have to provide the archive_name.

Result

Will return an absolute path to the zip file created.

Example

tasks:
  - name: zipper
    pull:
      plugin: pnp.plugins.pull.simple.Cron
      args:
        expressions:
          - "*/1 * * * * /path/to/backup"
    push:
      plugin: pnp.plugins.push.fs.Zipper
      args:
        out_path: !env BACKUP_DIR
      deps:
        plugin: pnp.plugins.push.simple.Echo

The next example is useful for dynamically adjusting the archive name to generate unique names for storing multiple backups:

tasks:
  - name: zipper
    pull:
      plugin: pnp.plugins.pull.simple.Cron
      args:
        expressions:
          - "*/1 * * * * /tmp/backup_folder"
    push:
      plugin: pnp.plugins.push.fs.Zipper
      args:
        out_path: !env BACKUP_DIR
      selector:
        archive_name: "lambda payload: '{}_{}'.format(now().isoformat(), 'backup.zip')"
        data: "lambda payload: payload"
      deps:
        plugin: pnp.plugins.push.simple.Echo

hass.Service

plugin type extra version
pnp.plugins.push.hass.Service push none 0.16.0

Description

Calls a home assistant service providing the payload as service-data.

Arguments

name type opt. default env description
url str no n/a no The url to your home assistant instance (e.g. http://hass:8123)
token str no n/a no The long live access token to get access to home assistant.
domain str no n/a no The domain of the service to call.
service str no n/a no The name of the service to call.
timeout int|float yes 5.0 no Tell the request to stop waiting for a response after given number of seconds.

Note

Create a long lived access token: Home Assistant documentation

Result

Returns the payload as-is for better chaining (this plugin can’t add any useful information).

Example

# Calls the frontend.set_theme service to oscillate between a "light" and a "dark" theme
tasks:
  - name: hass_service
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 10s
    push:
      plugin: pnp.plugins.push.hass.Service
      selector:
        name: "lambda i: 'clear' if i % 2 == 0 else 'dark'"
      args:
        url: http://localhost:8123
        token: !env HA_TOKEN
        domain: frontend
        service: set_theme
# Calls the notify.notify service to send a message with the actual counter
tasks:
  - name: hass_service
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 10s
    push:
      plugin: pnp.plugins.push.hass.Service
      selector:
        message: "lambda i: 'Counter: ' + str(i)"
      args:
        url: http://localhost:8123
        token: !env HA_TOKEN
        domain: notify
        service: notify

http.Call

plugin type extra version
pnp.plugins.push.http.Call push none 0.21.0

Description

Makes a request to a http resource.

Arguments

name type opt. default env description
url str no n/a yes Request url.
method str yes GET yes The http method to use for the request. Must be a valid http method (GET, POST, …).
fail_on_error bool yes False yes If True the push will fail on a http status code <> 2xx. This leads to an error message recorded into the logs and no further execution of any dependencies.
provide_response bool yes False no If True the push will not return the payload as it is, but instead provide the response status_code, fetched url content and a flag if the url content is a json response. This is useful for other push instances in the dependency chain.

Result

Will return the payload as it is for easy chaining of dependencies. If provide_response is True the push will return a dictionary that looks like this:

{
  "status_code": 200,
  "data": "fetched url content",
  "is_json": False
}

Example

# Simple example calling the built-in rest server
# Oscillates between http method GET and POST. Depending on the fact if the counter is even or not.
api:
  port: 9999
tasks:
  - name: http_call
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 5s
    push:
      plugin: pnp.plugins.push.http.Call
      selector:
        data:
          counter: "lambda data: data"
        method: "lambda data: 'POST' if int(data) % 2 == 0 else 'GET'"
      args:
        url: http://localhost:9999/counter
  - name: rest_server
    pull:
      plugin: pnp.plugins.pull.http.Server
      args:
        prefix_path: counter
        allowed_methods:
          - GET
          - POST
    push:
      plugin: pnp.plugins.push.simple.Echo
# Demonstrates the use of `provide_response` set to True.
# Call will return a response object to dependent push instances.
api:
  port: 9999
tasks:
  - name: http_call
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 5s
    push:
      plugin: pnp.plugins.push.http.Call
      args:
        url: http://localhost:9999/counter
        provide_response: true
      deps:
        plugin: pnp.plugins.push.simple.Echo
  - name: rest_server
    pull:
      plugin: pnp.plugins.pull.http.Server
      args:
        prefix_path: counter
        allowed_methods:
          - GET
    push:
      plugin: pnp.plugins.push.simple.Nop

ml.FaceR

plugin type extra version
pnp.plugins.push.ml.FaceR push faceR < 0.10.0

Description

FaceR (short one for face recognition) tags known faces in images. Output is the image with all faces tagged whether with the known name or an unknown_label. Default for unknown ones is Unknown.

Known faces can be ingested either by a directory of known faces (known_faces_dir) or by mapping of known_faces (dictionary: name -> [list of face files]).

The payload passed to the push method is expected to be a valid byte array that represents an image in memory. Please see the example section for loading physical files into memory.

Note

This one is not pre-installed when using the docker image. Would be grateful if anyone can integrate it

Arguments

name type opt. default env description
known_faces Dict[str, str] yes None no Mapping of a person’s name to a list of images that contain the person’s face.
known_faces_dir str yes None no A directory containing images with known persons (file_name -> person’s name).
unknown_label str yes Unknown no Tag label of unknown faces.
lazy bool yes False no If set to True the face encodings will be loaded when the first push is executed (lazy); otherwise the encodings are loaded when the plugin is initialized (during __init__).

Note

You need to specify either known_faces or known_faces_dir

Result

Will return a dictionary that contains the bytes of the tagged image (key tagged_image) and metadata (no_of_faces, known_faces)

{
  'tagged_image': <bytes of tagged image>
  'no_of_faces': 2
  'known_faces': ['obama']
}

Example

tasks:
  - name: faceR
    pull:
      plugin: pnp.plugins.pull.fs.FileSystemWatcher
      args:
        path: "/tmp/camera"
        recursive: true
        patterns: "*.jpg"
        ignore_directories: true
        case_sensitive: false
        events: [created]
        load_file: true
        mode: binary
        base64: false
    push:
      plugin: pnp.plugins.push.ml.FaceR
      args:
        known_faces_dir: "/tmp/faces"
        unknown_label: "don't know him"
        lazy: true

mqtt.Discovery

plugin type extra version
pnp.plugins.push.mqtt.Discovery push none 0.13.0

Description

Pushes an entity to home assistant by publishing it to an mqtt broker. The entity will be enabled to be auto discovered by home assistant.

Please see the home assistant docs about mqtt auto discovery.

The mqtt topic is structured like this:

<discovery_prefix>/<component>/[<node_id>/]<object_id>/[config|state]

You may also publish attributes besides your state. attributes can be passed by the envelope via runtime. Please see the examples section for further reference.

Arguments

name type opt. default env description
discovery_prefix str no n/a no The prefix for the topic.
component str no n/a no The component / type of the entity, e.g. sensor, light, …
config Dict no n/a no A dictionary of configuration items to configure the entity (e.g. icon -> mdi:soccer)
object_id str yes None yes The ID of the device. This is only to allow for separate topics for each device and is not used for the entity_id.
node_id str yes None yes A non-interpreted structuring entity to structure the MQTT topic.

Note

Inside the config section you can reference some variables to make the configuration easier. The following variables can be referenced via the dictmentor syntax "{{var::<variable>}}":

  • discovery_prefix
  • component
  • object_id
  • node_id
  • base_topic
  • config_topic
  • state_topic
  • json_attributes_topic

Please see the examples section on how to use that.

Result

Returns the payload as-is for better chaining (this plugin can’t add any useful information).

Example

tasks:
  - name: fitbit_steps
    pull:
      plugin: pnp.plugins.pull.fitbit.Current
      args:
        config: !env FITBIT_AUTH
        instant_run: true
        interval: 5m
        resources:
          - activities/steps
    push:
      - plugin: pnp.plugins.push.mqtt.Discovery
        selector: "data.get('activities/steps')"
        args:
          host: localhost
          discovery_prefix: homeassistant
          component: sensor
          object_id: fitbit_steps
          config:
            name: "{{var::object_id}}"
            icon: "mdi:soccer"
name: service_probing
pull:
  plugin: pnp.plugins.pull.net.PortProbe
  args:
    server: server  # Server name or ip address, default is localhost
    port: 3000  # The port to probe if somebody is listening
    timeout: 5
    interval: 2m  # Probe the port every five seconds ...
    instant_run: true  # ... and run as soon as pnp starts
push:
  - plugin: pnp.plugins.push.mqtt.Discovery
    selector:
      data: "lambda data: 'OFF' if data.get('reachable') else 'ON'"
      object_id: "service"
      attributes:
        friendly_name: My Service
        icon: mdi:monitor-dashboard
    args:
      host: !env MQTT_HOST
      discovery_prefix: !env MQTT_BASE_TOPIC
      component: binary_sensor
      config:
        name: "{{var::object_id}}"
        device_class: problem

mqtt.Publish

plugin type extra version
pnp.plugins.push.mqtt.Publish push none < 0.10.0

Description

Will push the given payload to a mqtt broker e.g. mosquitto. The broker is specified by host and port. In addition a topic needs to be specified were the payload will be pushed to (e.g. home/living/thermostat).

The payload will be pushed as it is. No transformation is applied. If you need to some transformations, use the selector.

Arguments

name type opt. default env description
host str no n/a no The host where the mqtt broker is running.
port int yes 1883 no The port where the mqtt broker is listening
topic Dict yes None yes The topic to subscribe to. If set to None the envelope of the payload has to contain a topic key or the push will fail. If both exists the topic from the envelope will overrule the __init__ one.
retain bool yes False no If set to True will mark the message as retained. See the mosquitto man page for further guidance: https://mosquitto.org/man/mqtt-7.html.
multi bool yes False no If set to True the payload is expected to be a dictionary. Each item of that dictionary will be send individually to the broker. The key of the item will be appended to the configured topic.

Result

For chaining of pushes the payload is simply returned as it is.

Example

# Demonstrates the basic mqtt.Publish

tasks:
  - name: mqtt
    pull:
      plugin: pnp.plugins.pull.simple.Count
    push:
      # Will push the counter to the 'home/counter/state' topic
      plugin: pnp.plugins.push.mqtt.Publish
      args:
        host: localhost
        topic: home/counter/state
        port: 1883
        retain: true
# Demonstrates the topic override via envelope

tasks:
  - name: mqtt
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
    push:
      plugin: pnp.plugins.push.mqtt.Publish
      # Lets override the topic via envelope mechanism
      # Will publish even counts on topic 'even' and uneven counts on 'uneven'
      selector:
        data: "lambda data: data"
        topic: "lambda data: 'test/even' if int(data) % 2 == 0 else 'test/uneven'"
      args:
        host: localhost
        port: 1883
# Demonstrates the use of multi push

tasks:
  - name: mqtt
    pull:
      # Periodically gets metrics about your system
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        instant_run: true
        interval: 10s
    push:
      # Push them to the mqtt
      plugin: pnp.plugins.push.mqtt.Publish
      args:
        host: localhost
        topic: devices/localhost/
        port: 1883
        retain: true
        # Each item of the payload-dict (cpu_count, cpu_usage, ...) will be pushed to the broker as multiple items.
        # The key of the item will be appended to the topic, e.g. `devices/localhost/cpu_count`.
        # The value of the item is the actual payload.
        multi: true

notify.Slack

plugin type extra version
pnp.plugins.push.notify.Slack push none 0.20.0

Description

Sends a message to a given Slack channel.

You can specify the channel, the name of the poster, the icon of the poster and a list of users to ping.

Arguments

name type opt. default env description
api_key str no n/a no The api key of your slack oauth token.
channel str no n/a yes The channel to post the message to.
username str yes PnP yes The username of the message poster.
emoji str yes :robot: yes The emoji of the message poster.
ping_users List[str] yes None yes A list of users to ping when the message is posted. By default no one is ping’d.

Result

Will return the payload as it is for easy chaining of dependencies.

Example

tasks:
  - name: slack
    pull:
      plugin: pnp.plugins.pull.simple.Count  # Let's count
      args:
        wait: 10
    push:
      - plugin: pnp.plugins.push.notify.Slack
        selector:
          data: "lambda data: 'This is the counter: {}'.format(data)"
          # You can override the channel if necessary
          # channel: "lambda data: 'test_even' if int(data) % 2 == 0 else 'test_odd'"
          # You can override the username if necessary
          # username: the_new_user
          # You can override the emoji if necessary
          # emoji: ':see_no_evil:'
          # You can override the ping users if necessary
          # ping_users:
          #   - clone_dede
        args:
          api_key: !env SLACK_API_KEY  # Your slack api key.
          channel: test  # The channel to post to. Mandatory. Overridable by envelope.
          username: slack_tester  # The username to show. Default is PnP. Overridable by envelope
          emoji: ':pig:'  # The emoji to use. Default is :robot: . Overridable by envelope
          ping_users:  # The users you want to ping when the message is send. Overridable by envelope
            - dede

simple.Echo

plugin type extra version
pnp.plugins.push.simple.Echo push none < 0.10.0

Description

Simply log the passed payload to the default logging instance.

Result

Will return the payload as it is for easy chaining of dependencies.

Example

tasks:
  - name: echo
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
        from_cnt: 1
        to_cnt: 10
    push:
      plugin: pnp.plugins.push.simple.Echo

simple.Execute

plugin type extra version
pnp.plugins.push.simple.Execute push none 0.12.0

Description

Executes a command with given arguments in a shell of the operating system. Both command and args may include placeholders (e.g. {{placeholder}}) which are injected at runtime by passing the specified payload after selector transformation. Please see the examples section for further details.

Will return the exit code of the command and optionally the output from stdout and stderr.

Arguments

name type opt. default env description
command str no n/a no The command to execute. May contain placeholders.
args List[str] yes [] no The arguments to pass to the command. Default is no arguments. May contain placeholders.
cwd str yes special no Specifies where to execute the command (working directory). Default is the folder where the invoked pnp configuration file is located.
timeout str|float yes 5s no Specifies how long the worker should wait for the command to finish.
capture bool yes False no If True stdout and stderr output is captured, otherwise not.

Result

Returns a dictionary that contains the return_code and optionally the output from stdout and stderr whether capture is set or not. The output is a list of lines.

{
  "return_code": 0
  "stdout": ["hello", "dude!"]
  "stderr": []
}

Example

tasks:
  - name: execute
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
        from_cnt: 1
    push:
      plugin: pnp.plugins.push.simple.Execute
      selector:
        command: echo
        count: "lambda data: str(data)"
        labels:
          prefix: "The actual count is"
          iter: iterations
      args:
        command: "{{command}}"  # The command to execute (passed by selector)
        args:
          - "{{labels.prefix}}"
          - "{{count}}"  # The named argument passed at runtime by selector
          - "{{labels.iter}}"
        timeout: 2s
        cwd:  # None -> pnp-configuration directory
        capture: true  # Capture stdout and stderr
      deps:
        - plugin: pnp.plugins.push.simple.Echo
# How to escape " correctly

tasks:
  - name: execute
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
        from_cnt: 1
    push:
      plugin: pnp.plugins.push.simple.Execute
      selector:
        command: echo
        salutation: "\"hello you\""
      args:
        command: "{{command}}"  # The command to execute (passed by selector)
        args:
          - "{{salutation}}"
        timeout: 2s
        cwd:  # None -> pnp-configuration directory
        capture: true  # Capture stdout and stderr
      deps:
        - plugin: pnp.plugins.push.simple.Echo
# Capturing multiline output
tasks:
  - name: execute
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        interval: 1s
        from_cnt: 1
    push:
      plugin: pnp.plugins.push.simple.Execute
      selector:
        command: cat multiline.txt
      args:
        command: "{{command}}"  # The command to execute (passed by selector)
        cwd:  # None -> pnp-configuration directory
        capture: true  # Capture stdout and stderr
      deps:
        - plugin: pnp.plugins.push.simple.Echo

simple.Nop

plugin type extra version
pnp.plugins.push.simple.Nop push none < 0.10.0

Description

Executes no operation at all. A call to push(…) just returns the payload. This push is useful when you only need the power of the selector for dependent pushes.

See the example section for an example.

Nop = No operation OR No push ;-)

Result

Will return the payload as it is for easy chaining of dependencies.

Example

tasks:
  - name: nop
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        interval: 1s
        repeat:
          - 1
          - 2
          - 3
    push:
      plugin: pnp.plugins.push.simple.Nop
      selector: "data + [4]"
      deps:
        plugin: pnp.plugins.push.simple.Echo
        unwrap: true

simple.Wait

plugin type extra version
pnp.plugins.push.simple.Wait push none 0.19.0

Description

Performs a sleep operation and waits for some time to pass by.

Arguments

name type opt. default env description
wait_for float|str no n/a no The time to wait for before proceeding. You can pass literals such as 5s, 1m; ints such as 1, 2, 3 or floats such as 0.5. In the end everything will be converted to it’s float representation (1 => 1.0; 5s => 5.0; 1m => 60.0; 0.5 => 0.5)

Result

Will return the payload as it is for easy chaining of dependencies.

Example

tasks:
  - name: wait
    pull:
      plugin: pnp.plugins.pull.simple.Count  # Let's count
      args:
        interval: 1s
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector: "'START WAITING: {}'.format(payload)"
        deps:
          - plugin: pnp.plugins.push.simple.Wait
            args:
              wait_for: 3s
            deps:
              - plugin: pnp.plugins.push.simple.Echo
                selector: "'END WAITING: {}'.format(payload)"

storage.Dropbox

plugin type extra version
pnp.plugins.push.storage.Dropbox push dropbox 0.12.0

Description

Uploads a provided file to the configured dropbox account.

Arguments

name type opt. default env description
api_key str no n/a no The api key to your dropbox account/app.
target_file_name str yes None yes The file path on the server where to upload the file to. If not specified you have to specify this argument during runtime by setting it in the envelope.
created_shared_link bool yes True no If set to True, the push will create a publicly available link to your uploaded file.

Result

Returns a dictionary that contains metadata information about your uploaded file. If you uploaded a file named 42.txt, your result will be similar to the one below:

{
  "name": "42.txt",
  "id": "HkdashdasdOOOOOadss",
  "content_hash": "aljdhfjdahfafuhu489",
  "size": 42,
  "path": "/42.txt",
  "shared_link": "http://someserver/tosomestuff/asdasd?dl=1",
  "raw_link": "http://someserver/tosomestuff/asdasd?raw=1"
}

shared_link is the one that is publicly available (but only if you know the link). Same for raw_link, but this link will return the raw file (without the dropbox overhead).

Both are None if create_shared_link is set to False.

Example

tasks:
  - name: dropbox
    pull:
      plugin: pnp.plugins.pull.fs.FileSystemWatcher
      args:
        path: "/tmp"
        ignore_directories: true
        events:
          - created
          - modified
        load_file: false
    push:
      - plugin: pnp.plugins.push.storage.Dropbox
        args:
          api_key: !env DROPBOX_API_KEY
          create_shared_link: true  # Create a publicly available link
        selector:
          data: "lambda data: data.source"  # Absolute path to file
          target_file_name: "lambda data: basename(data.source)"  # File name only

timedb.InfluxPush

plugin type extra version
pnp.plugins.push.timedb.InfluxPush push none < 0.10.0

Description

Pushes the given payload to an influx database using the line protocol. You have to specify host, port, user, password and the database.

The protocol is basically a string that will be augmented at push-time with data from the payload. E.g. {payload.metric},room={payload.location} value={payload.value} assumes that payload contains metric, location and value.

Arguments

name type opt. default env description
host str no n/a no The host where influx service is running.
port int no n/a no The port where the influx service is listening on.
user str no n/a no Username to use for authentication.
password str no n/a no Related password.
database str no n/a no The database to store the measurement.
protocol str no n/a no Line protocol template (augmented with payload-data).

Result

For the ability to chain multiple pushes together the payload is simply returned as is.

Example

tasks:
  - name: influx_push
    pull:
      plugin: pnp.plugins.pull.mqtt.Subscribe
      args:
        host: mqtt
        topic: home/#
    push:
      plugin: pnp.plugins.push.timedb.InfluxPush
      selector:
        data: "lambda data: data"
      args:
        host: influxdb
        port: 8086
        user: root
        password: secret
        database: home
        # This assumes that your topics are structured like this:
        # home/<room e.g. living>/<sensor e.g. humidity>
        protocol: "{payload.levels[2]},room={payload.levels[1]} value={payload.payload}"

UDFs

New in version 0.14.0.

All udfs do share the following base arguments:

name type opt. default description
throttle str/float yes None If set to a valid duration literal (e.g. 5m) the return value of the called functions will be cached for the given amount of time.

Note

Please note that even when an udf does not require arguments, you anyway have to specify the args: section. Otherwise it will be interpreted as a regular function and not as a UDF.

...
udfs:
  - name: fsize
    args:
tasks:
  ...

hass.State

plugin type extra version
pnp.plugins.udf.hass.State udf none 0.14.0

Description

Fetches the state of an entity from home assistant by a rest-api call.

Arguments

name type opt. default description
url str no n/a The url to your home assistant instance (e.g. http://hass:8123)
token str no n/a The long lived access token to get access to home assistant
timeout float yes 5.0 Tell the request to abort the waiting for a response after given number of seconds

Note

Create a long lived access token: Home Assistant documentation

Call Arguments

name type opt. default description
entity_id str no n/a The entity to fetch the state
attribute str yes None Optionally you can fetch the state of one of the entity attributes. Not passed will fetch the state of the entity

Result

Returns the current state of the entity or one of it’s attributes. If the entity is not known to home assistant an exception is raised. In case of an attribute does not exists, None will be returned instead to signal it’s absence.

Example

udfs:
  # Defines the udf. name is the actual alias you can call in selector expressions.
  - name: hass_state
    plugin: pnp.plugins.udf.hass.State
    args:
      url: http://localhost:8123
      token: !env HA_TOKEN
tasks:
  - name: hass_state
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        repeat: "Hello World"  # Repeats 'Hello World'
        interval: 1s  # Every second
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Will only print the data when attribute azimuth of the sun component is above 200
        selector: "'azimuth is greater than 200' if hass_state('sun.sun', attribute='azimuth') > 200.0 else SUPPRESS"
      - plugin: pnp.plugins.push.simple.Echo
        # Will only print the data when the state of the sun component is above 'above_horizon'
        selector: "'above_horizon' if hass_state('sun.sun') == 'above_horizon' else SUPPRESS"

simple.Counter

plugin type extra version
pnp.plugins.udf.simple.Counter udf none 0.14.0

Description

Memories a counter value which is increased everytime you call the udf.

Arguments

name type opt. default description
init int yes 0 The initialization value of the counter.

Result

Returns the current counter.

Example

udfs:
  # Defines the udf. name is the actual alias you can call in selector expressions.
  - name: counter
    plugin: pnp.plugins.udf.simple.Counter
    args:
tasks:
  - name: countme
    pull:
      plugin: pnp.plugins.pull.simple.Repeat
      args:
        repeat: "Hello World"  # Repeats 'Hello World'
        interval: 1s  # Every second
    push:
      - plugin: pnp.plugins.push.simple.Echo
        selector:
          data: "lambda data: data"
          count: "lambda data: counter()"  # Calls the udf

simple.FormatSize

plugin type extra version
pnp.plugins.udf.simple.FormatSize udf none 0.14.0

Description

Returns the size of a file (or whatever) as a human readable size (e.g. bytes, KB, MB, GB, TB, PB). The input is expected to be at byte scale.

Call Arguments

name type opt. default description
size_in_bytes int|float no n/a The size in bytes to format to a human readable format.

Result

Returns the argument in a human readable size format.

Example

udfs:
  # Defines the udf. name is the actual alias you can call in selector expressions.
  - name: fsize
    plugin: pnp.plugins.udf.simple.FormatSize
    args:
tasks:
  - name: format_size
    pull:
      plugin: pnp.plugins.pull.fs.Size
      args:
        instant_run: true
        interval: 5s
        fail_on_error: false
        paths:
          logs: /var/log  # directory - recursively determines size
          copy: /bin/cp  # file
    push:
      plugin: pnp.plugins.push.simple.Nop
      selector: "[(k ,v) for k, v in data.items()]"  # Transform the dictionary into a list
      deps:
        plugin: pnp.plugins.push.simple.Echo
        unwrap: true  # Call the push for each individual item in the list
        selector:
          object: "lambda d: d[0]"
          data: "lambda d: fsize(d[1])"

simple.Memory

plugin type extra version
pnp.plugins.udf.simple.Memory udf none 0.14.0

Description

Returns a previously memorized value when called.

Arguments

name type opt. default description
init Any yes None The initial memory of the plugin. When not set initially the first call will return the value of new_memory, if specified; otherwise None.

Call Arguments

name type opt. default description
new_memory Any no None After emitting the current memorized value the current memory is overwritten by this value. Will only be overwritten if the parameter is specified.

Result

Returns the memorized value.

Example

udfs:
  - name: mem
    plugin: pnp.plugins.udf.simple.Memory
    args:
tasks:
  - name: countme
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        from_cnt: 1
        interval: 1s  # Every second
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Will memorize every uneven count
        selector: "mem() if data % 2 == 0 else mem(new_memory=data)"
udfs:
  - name: mem
    plugin: pnp.plugins.udf.simple.Memory
    args:
tasks:
  - name: countme
    pull:
      plugin: pnp.plugins.pull.simple.Count
      args:
        from_cnt: 1
        interval: 1s  # Every second
    push:
      - plugin: pnp.plugins.push.simple.Echo
        # Will memorize every uneven count
        selector: "mem() if data % 2 == 0 else mem(new_memory=data)"

Appendix

Fitbit Authentication

To request data from the fitbit account it is necessary to create an app. Go to dev.fitbit.com. Under Manage go to Register an App.

For the application website and organization website, name it anything starting with http:// or https://. Secondly, make sure the OAuth 2.0 Application Type is Personal. Lastly, make sure the Callback URL is http://127.0.0.1:8080/ in order to get our Fitbit API to connect properly. After that, click on the agreement box and submit. You will be redirected to a page that contains your Client ID and your Client Secret.

Next we need to acquire your initial access- and refresh-token.

git clone https://github.com/orcasgit/python-fitbit.git
cd python-fitbit
python3 -m venv venv
source venv/bin/activate
pip install -r dev.txt
./gather_keys_oauth2.py <client_id> <client_secret>

You will be redirected to your browser and asked to login to your fitbit account. Next you can restrict the app to certain data. If everything is fine, your console window should print your access- and refresh-token and also expires_at.

Put your client_id, client_secret, access_token, refresh_token and expires_at to a yaml file and use this file-path as the config argument of this plugin. Please see the example below:

access_token: <access_token>
client_id: <client_id>
client_secret: <client_secret>
expires_at: <expires_at>
refresh_token: <refresh_token>

That’s it. If your token expires it will be refreshed automatically by the plugin.

Docker

Prelude

Each new version of pnp will also provide a new docker image.

# Run your container interactive and delete it after use
docker run -it --rm -v <path/to/config.yaml>:/config/config.yaml hazard/pnp:latest

# -d will run the container as a daemon in the background
docker run -d -v <path/to/config.yaml>:/config/config.yaml hazard/pnp:latest

The image tag latest points to the latest release. You are encouraged to point to a specific tag.

Use a specific tag

To use a specific docker image tag you need to replace latest by some valid tag. By default every version gets its own tag.

For example if you want to use pnp version 0.21.0

docker run -it --rm -v <path/to/config.yaml>:/config/config.yaml hazard/pnp:0.21.0

You can also checkout the Dockerhub for available image tags. You will also find arm images to run pnp on a raspberry (which is what I do primarily).

Mounting a config directory

Sometimes you need to add auxiliary files like a logging configuration, authentication tokens, dictmentor resources, …. To make this possible you can mount a directory instead of just a single configuration file.

docker run -it --rm -v <path/to/folder:/config hazard/pnp:0.21.0

If you do it this way your main configuration has to be named config.yaml.

Examples

Compiles a set of example configs to inspire you what you can achieve by using pnp.

Backup to dropbox

# Every sunday at 1am a backup of the specified directory is created
# and stored at the dropbox service.

tasks:
  - name: cron_backup
    pull:
      plugin: pnp.plugins.pull.simple.Cron
      args:
        expressions:
          - "0 1 * * SUN /tmp"
    push:
      plugin: pnp.plugins.push.fs.Zipper
      deps:
        - plugin: pnp.plugins.push.storage.Dropbox
          args:
            api_key: !env DROPBOX_API_KEY
            create_shared_link: false
          selector:
            data: "lambda payload: payload"
            target_file_name: "lambda payload: '{}_{}'.format(str(now()), basename(payload))"

Expose internet speed to home assistant

# We use the api to test our speed when we want
#   curl -X POST "http://localhost:80/trigger?task=speedtest" -H "accept: application/json"
api:
  port: 80
tasks:
  - name: speedtest
    pull:
      plugin: pnp.plugins.pull.net.Speedtest
      args:
        interval: "0 6 * * *"  # Run every morning at 6 am
    push:
      - plugin: pnp.plugins.push.mqtt.Discovery
        selector: "data.get('download_speed_mbps')"
        args:
          host: localhost
          discovery_prefix: homeassistant
          component: sensor
          object_id: speedtest_download
          config:
            name: "{{var::object_id}}"
            icon: "mdi:cloud-download-outline"
            state_topic: "{{var::state_topic}}"
            unit_of_measurement: "Mbps"
      - plugin: pnp.plugins.push.mqtt.Discovery
        selector: "data.get('upload_speed_mbps')"
        args:
          host: localhost
          discovery_prefix: homeassistant
          component: sensor
          object_id: speedtest_upload
          config:
            name: "{{var::object_id}}"
            icon: "mdi:cloud-upload-outline"
            state_topic: "{{var::state_topic}}"
            unit_of_measurement: "Mbps"
      - plugin: pnp.plugins.push.mqtt.Discovery
        selector: "data.get('ping_latency')"
        args:
          host: localhost
          discovery_prefix: homeassistant
          component: sensor
          object_id: speedtest_ping
          config:
            name: "{{var::object_id}}"
            icon: "mdi:lan-pending"
            state_topic: "{{var::state_topic}}"
            unit_of_measurement: "ms"
      - plugin: pnp.plugins.push.mqtt.Discovery
        selector:
          data: "lambda data: data.get('server').get('name')"
          attributes:
            isp: "lambda data: data.get('client').get('isp')"
            rating: "lambda data: data.get('client').get('rating')"
            host: "lambda data: data.get('server').get('host')"
            result_image: "lambda data: data.get('result_image')"
        args:
          host: localhost
          discovery_prefix: homeassistant
          component: sensor
          object_id: speedtest_host
          config:
            name: "{{var::object_id}}"
            icon: "mdi:cloud-sync-outline"
            state_topic: "{{var::state_topic}}"
            json_attributes_topic: "{{var::json_attributes_topic}}"

Fitbit to home assistant

# Polls your fitbit account for the step count and devices every 5 minutes
# and publishes those metrics to home assistant via mqtt discovery.
#    https://www.home-assistant.io/docs/mqtt/discovery/
#
# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration file.

- name: fitbit_steps
  pull:
    plugin: pnp.plugins.pull.fitbit.Current
    args:
      config: !env FITBIT_AUTH
      instant_run: true  # Run as soon as pnp starts
      interval: 5m
      resources:
        - activities/steps
  push:
    # Adds a sensor.fitbit_steps to home assistant
    - plugin: pnp.plugins.push.mqtt.Discovery
      selector: "data.get('activities/steps')"
      args:
        host: localhost
        discovery_prefix: homeassistant
        component: sensor
        object_id: fitbit_steps
        config:
          name: "{{var::object_id}}"
          icon: "mdi:soccer"

- name: fitbit_devices_battery
  pull:
    plugin: pnp.plugins.pull.fitbit.Devices
    args:
      config: !env FITBIT_AUTH
      instant_run: true
      interval: 5m
  push:
    # Adds sensor.fb_<device>_battery for each device attached to your fitbit account
    - plugin: pnp.plugins.push.mqtt.Discovery
      selector:
        data: "lambda data: data.get('battery_level')"
        object_id: "lambda data: 'fb_{}_battery'.format(data.get('device_version', '').replace(' ', '_').lower())"
      unwrap: true
      args:
        host: localhost
        discovery_prefix: homeassistant
        component: sensor
        config:
          name: "{{var::object_id}}"
          device_class: "battery"
          unit_of_measurement: "%"
    # Adds sensor.fb_<device>_lastsync for each device attached to your fitbit account
    - plugin: pnp.plugins.push.mqtt.Discovery
      selector:
        data: "lambda data: data.get('last_sync_time')"
        object_id: "lambda data: 'fb_{}_lastsync'.format(data.get('device_version', '').replace(' ', '_').lower())"
      unwrap: true
      args:
        host: localhost
        discovery_prefix: homeassistant
        component: sensor
        config:
          name: "{{var::object_id}}"

Image face recognition

# Watches the directory '/tmp/camera' for file changes on image files and
# publishes them to a message queue (base64 encoded).
#
# Then the image data is pulled from the queue and a face recognition
# is executed. The result is a tagged file (known and unknown persons) which
# will be dumped to the specified directory.
#

- name: image_watcher
  pull:
    plugin: pnp.plugins.pull.fs.FileSystemWatcher
    args:
      path: "/tmp/camera"
      events: [created]
      ignore_directories: true
      patterns: ['*.jpeg', '*.jpg', '*.png']
      load_file: true
      base64: true

  push:
    plugin: pnp.plugins.push.mqtt.Publish
    selector: payload.file.content
    args:
      host: localhost
      topic: camera/images

- name: image_processor
  pull:
    plugin: pnp.plugins.pull.mqtt.Subscribe
    args:
      host: localhost
      topic: camera/images
  push:
    plugin: pnp.plugins.push.ml.FaceR
    selector: b64decode(payload.payload)
    args:
      known_faces_dir: '/tmp/faces'
      lazy: true
    deps:
      - plugin: pnp.plugins.push.simple.Echo
        selector: (payload['known_faces'], payload['no_of_faces'])
      - plugin: pnp.plugins.push.fs.FileDump
        selector: payload['tagged_image']
        args:
          directory: '.'
          extension: '.png'

Miflora to home assistant

# Polls a miflora device at :20 and publishes its reading
# to home assistant via mqtt discovery.
#    https://www.home-assistant.io/docs/mqtt/discovery/

- name: miflora
  pull:
    plugin: pnp.plugins.pull.sensor.MiFlora
    args:
      mac: 'C4:7C:8D:67:50:AB'  # The mac of your miflora device
      instant_run: false
      interval: '20 * * * *'
  push:
    - plugin: pnp.plugins.push.simple.Nop
      selector: "data if data.get('conductivity') else SUPPRESS"
      deps:
        - plugin: pnp.plugins.push.mqtt.Discovery
          selector:
            data: "lambda data: data.get('conductivity')"
            object_id: "miflora_conductivity"
          args:
            host: localhost
            discovery_prefix: homeassistant
            component: sensor
            config:
              name: "{{var::object_id}}"
              unit_of_measurement: "µS/cm"
              icon: mdi:flash-circle
              friendly_name: Conductivity
    - plugin: pnp.plugins.push.simple.Nop
      selector: "data if data.get('light') else SUPPRESS"
      deps:
        - plugin: pnp.plugins.push.mqtt.Discovery
          selector:
            data: "lambda data: data.get('light')"
            object_id: "miflora_light_intensity"
          args:
            host: localhost
            discovery_prefix: homeassistant
            component: sensor
            config:
              name: "{{var::object_id}}"
              unit_of_measurement: "lx"
              icon: mdi:white-balance-sunny
              friendly_name: Light intensity
    - plugin: pnp.plugins.push.simple.Nop
      selector: "data if data.get('temperature') else SUPPRESS"
      deps:
        - plugin: pnp.plugins.push.mqtt.Discovery
          selector:
            data: "lambda data: data.get('temperature')"
            object_id: "miflora_temperature"
          args:
            host: localhost
            discovery_prefix: homeassistant
            component: sensor
            config:
              name: "{{var::object_id}}"
              unit_of_measurement: "°C"
              icon: mdi:thermometer
              friendly_name: Temperature
    - plugin: pnp.plugins.push.simple.Nop
      selector: "data if data.get('battery') else SUPPRESS"
      deps:
        - plugin: pnp.plugins.push.mqtt.Discovery
          selector:
            data: "lambda data: data.get('battery')"
            object_id: "miflora_battery"
          args:
            host: localhost
            discovery_prefix: homeassistant
            component: sensor
            config:
              name: "{{var::object_id}}"
              unit_of_measurement: "%"
              device_class: battery
              friendly_name: Battery
    - plugin: pnp.plugins.push.simple.Nop
      selector: "data if data.get('moisture') else SUPPRESS"
      deps:
        - plugin: pnp.plugins.push.mqtt.Discovery
          selector:
            data: "lambda data: data.get('moisture')"
            object_id: "miflora_moisture"
          args:
            host: localhost
            discovery_prefix: homeassistant
            component: sensor
            config:
              name: "{{var::object_id}}"
              unit_of_measurement: "%"
              icon: mdi:water-percent
              friendly_name: Moisture

Monitoring

# Client component
# Sends statistics about the host system (like cpu usage, ram usage, ...)
# to a mqtt broker every 30 seconds.

tasks:
  - name: stats
    pull:
      plugin: pnp.plugins.pull.monitor.Stats
      args:
        interval: 30s
        instant_run: true
    push:
      plugin: pnp.plugins.push.mqtt.Publish
      args:
        host: !env MQTT_HOST
        topic: devices/my_name/stats
        port: 1883
        retain: true
        # Each item of the payload-dict (cpu_count, cpu_usage, ...) will be pushed to the broker as multiple items.
        # The key of the item will be appended to the topic, e.g. `devices/localhost/cpu_count`.
        # The value of the item is the actual payload.
        multi: true
# Server component
# Listens to the mqtt topic where the readings from each client are stored.
# If a new reading arrives it will be send to an influx database
# to save it for later evaluation.

tasks:
  - name: stats_mqtt_pull
    pull:
      plugin: pnp.plugins.pull.mqtt.MQTTPull
      args:
        host: !env MQTT_HOST
        topic: devices/+/stats/#
    push:
      plugin: pnp.plugins.push.timedb.InfluxPush
      selector: "{'data': payload}"
      args:
        host: "localhost"
        port: 8086
        user: "the_user"
        password: "the_password"
        database: "my_db"
        protocol: "{payload.levels[3]},device={payload.levels[1]},domain=stats value={payload.payload}"

Naive dropbox sync

# Every time a file is created or modified in /tmp
# the file is uploaded to dropbox and you are notified
# about it via pushbullet.
#
# You need to set your environment variables properly:
# - DROPBOX_API_KEY
# - SLACK_API_KEY

- name: dropbox_sync_notify
  pull:
    plugin: pnp.plugins.pull.fs.FileSystemWatcher
    args:
      path: "/tmp"
      ignore_directories: true
      events:
        - created
        - modified
      load_file: false
  push:
    - plugin: pnp.plugins.push.storage.Dropbox
      args:
        api_key: !env DROPBOX_API_KEY
      selector:
        data: "lambda data: data.source"  # Absolute path to file
        target_file_name: "lambda data: basename(data.source)"  # File name only
      deps:
        - plugin: pnp.plugins.push.notify.Slack
          args:
            api_key: !env SLACK_API_KEY  # Your slack api key.
            channel: test  # The channel to post to. Mandatory. Overridable by envelope.
          selector: data.raw_link

Changelog

0.28.0

  • Feature: Implements yaml tags !env and !include #43
  • Fix: Fixes import issue on recent scipy version #42
  • Fix: Bumps dependency speedtest-cli to fix a bug that appears on empty server ids #44

0.27.0

  • Breaking: Migrates from sanic to fastapi (some endpoints have slightly changed; see openapi docs) #34
  • Breaking: Deprecates / Removes push.notify.Pushbullet #35
  • Breaking: Deprecates / Removes pull.zway.ZwayReceiver #36
  • Breaking: Deprecates / Removes push.mail.GMail #37
  • Breaking: Deprecates / Removes pull.traffic.DeutscheBahn #38
  • Breaking: Deprecates / Removes pull.camera.MotionEyeWatcher #39
  • Breaking: Deprecates / Removes pull.sensor.OpenWeather #40
  • Breaking (dev): Refactors the plugin interface to better distinguish between sync / async plugins to provide a simpler developer interface #41
  • Breaking: Removes enable_swagger from the api configuration properties #41
  • Breaking (dev): Replaces auto_str decorator by ReprMixin based on basic_repr from fastcore package #41
  • Feature: Migrates from attrs to pydantic for container objects #41
  • Feature: Introduces a runner to start up an application context #41
  • Feature: push.mqtt.Discovery: Adds auto configuration of state- and json attributes topics #33
  • Feature: push.fs.Zipper: Adds archive_name argument to (dynamically) control the name of the created archive #31

0.26.1

  • Fix: pull.net.PortProbe: Catches a socket error when there is no route to the target host #32

0.26.0

  • Breaking: Overhaul of the console scripts (see Console Runner) #29
  • Feature: Colorful printing of configuration and logging #29
  • Feature: Migrates Dockerfile from stretch to buster #27
  • Fix: Proper handling of docker stop #30

0.25.0

  • Breaking: Removes python 3.5 support
  • Breaking: Updates dependencies
  • Feature: Adds official python 3.8 support
  • Feature: Implements pull.net.Speeedtest for testing your internet connection speed
  • Fix: Fixes documentation regarding push.fs.Zipper

0.24.0

  • Migrates the documentation to Read the Docs: https://pnp.readthedocs.io/en/latest/
  • Implements an API
  • Breaking: pull.zway.ZwayReceiver integrates it’s endpoint into the api
  • Breaking: pull.http.Server integrates the endpoint into the api
  • Breaking: Removes engine override from console runner (because we only have a single engine)
  • Breaking: Removes argresolver support
  • Breaking: Removes pull.trigger.TriggerBase and pull.trigger.Web. Tasks can be triggered via the api now

0.23.0

  • Breaking: Removes engines except for AsyncEngine
  • Implements pull.simple.RunOnce to run a polling component or a push chain once (nice for scripts)
  • Migration from setup.py to poetry
  • Migration from Makefile to python-invoke
  • Introduces new configuration concept (interface: ConfigLoader)

0.22.0

  • Updates docker base image to python 3.7
  • Adds pull.presence.FritzBoxTracker to track known devices on a Fritz!Box
  • Adds json_attributes_topic support to push.mqtt.Discovery
  • Adds pull.net.SSLVerify to check ssl certificates

0.21.1

  • Feature: Enables arm emulator for arm dockerfile to use docker hub autmated build
  • Bugfix: Removes timeout from component push.storage.Dropbox

0.21.0

  • Adds push.fs.Zipper to zip dirs and files in the process chain

0.20.2

  • Bugfix: Fixes udf throttling to take arguments into account for result caching
  • Refactors udf throttling / caching code to be more pythonic
  • Adjusts pull.simple components to act like polling components

0.20.1

  • Bugfix: Socket shutdown of pull.net.PortProbe sometimes fails in rare occasions. Is now handled properly

0.20.0

0.19.1

  • Bugfix: Adds bug workaround in schiene package used by pull.traffic.DeutscheBahn
  • Bugfix: Adds exception message truncation for logging.SlackHandler to ensure starting and ending backticks (code-view)

0.19.0

  • Adds pull.traffic.DeutscheBahn to poll the Deutsche Bahn website using the schiene package to find the next scheduled trains
  • Adds push.simple.Wait to interrupt the execution for some specified amount of time
  • Breaking: Component pull.sensor.Sound can now check multiple sound files for similarity. The configurational arguments changed. Have a look at the docs
  • Breaking: Fixes ignore_overflow of pull.sensor.Sound plugin (which actually has the opposite effect)
  • Breaking: pull.sensor.Sound can optionally trigger a cooldown event after the cooldown period expired. This is useful for a binary sensor to turn it off after the cooldown
  • Adds slack logging handler to log messages to a slack channel and optionally ping users
  • Adds pull.net.PortProbe plugin to probe a specific port if it’s being used

0.18.0

  • Integrates an asyncio featured/powered engine. I think this will be the default in the future. Stay tuned!

0.17.1

  • Fixes missing typing-extensions dependency
  • Fixes urllib3 versions due to requests incompatibilities

0.17.0

  • Adjusts inline documentation - refers to github documentation
  • Refactors a majority of codebase to comply to pylint linter
  • Integrates yamllint as linter
  • Refactores RetryDirective (namedtuple to attr class)
  • Adds decorators for parsing the envelope in a push context
  • Breaking: Removes push.simple.Execute and replace it by push.simple.TemplatedExecute
  • Adjusts method logger in plugin classes to automatically prepend plugin name
  • Integrates coveralls
  • Adds pull.ftp.Server plugin
  • Adds lazy configuration property to push.ml.FaceR (basically to test initialization of FaceR without installing face-recognition and dlib)
  • Adds pull.fs.Size plugin
  • Adds typing for most of the core codebase and adds mypy as linter

0.16.0

  • Adds ignore_overflow argument to pull.sensor.Sound to ignore buffer overflows errors on slow devices
  • Possible Breaking: Adds raspberrypi specific stats (under voltage, throttle, …) to pull.monitor.stats
  • Professionalizes docker image build process / Testing the container
  • Documentation cosmetics
  • Adds cron-like pull pull.simple.Cron
  • Adds pull.camera.MotionEyeWatcher to watch a MotionEye directory to emit events
  • Adds push.hass.Service to call home assistant services by rest-api
  • Breaking: New default value of cwd argument of push.simple.Execute is now the folder where the invoked pnp-configuration is located and not the current working directory anymore
  • Adds push.simple.TemplatedExecute as a replacement for push.simple.Execute
  • Adds cron-expressions to polling base class
  • Adds pull.sensor.MiFlora plugin to periodically poll xiaomi miflora devices

0.15.0

  • Adds push.mail.GMail to send e-mails via the gmail api
  • Adds throttle-feature to user defined functions via base class
  • Adds pull.sensor.Sound to listen to the microphone’s sound stream for occurrence of a specified sound

0.14.0

  • Adds UDF (user defined functions)
  • Adds UDF udf.hass.State to request the current state of an entity (or one of it’s attributes) from home assistant
  • Makes selector expressions in complex structures (dicts / lists) more explicit using lambda expressions with mandatory payload argument. This will probably break configs that use complex expressions containing lists and/or dictionaries
  • Adds pull.hass.State to listen to state changes in home assistant
  • Fixes bug in pull.fitbit.Goal when fetching weekly goals (so far daily goals were fetched too)
  • Adds UDF udf.simple.Memory to memorize values to access them later

0.13.0

  • Adds pull.fitbit.Current, pull.fitbit.Devices, pull.fitbit.Goal plugins to request data from fitbit api
  • Adds push.mqtt.Discovery to create mqtt discovery enabled devices for home assistant. [Reference](https://www.home-assistant.io/docs/mqtt/discovery/)
  • Adds unwrapping-feature to pushes

0.12.0

  • Adds additional argument multi (default False) to push.mqtt.MQTTPush to send multiple messages to the broker if the payload is a dictionary (see plugin docs for reference)
  • Adds plugin pull.monitor.Stats to periodically emit stats about the host system
  • Adds plugin push.notify.Pushbullet to send message via the pushbullet service
  • Adds plugin push.storage.Dropbox to upload files to a dropbox account/app
  • Adds feature to use complex lists and/or dictionary constructs in selector expressions
  • Adds plugin pull.gpio.Watcher (extra gpio) to watch gpio pins for state changes. Only works on raspberry
  • Adds plugin push.simple.Execute to run commands in a shell
  • Adds extra http-server to optionally install flask and gevent when needed
  • Adds utility method to check for installed extras
  • Adds -v | –verbose flag to pnp runner to switch logging level to DEBUG. No matter what…

0.11.3

  • Adds auto-mapping magic to the pull.zway.ZwayReceiver.
  • Adds humidity and temperature offset to dht

0.11.2

  • Fixes error catching of run_pending in Polling base class

0.11.1

  • Fixes resolution of logging configuration on startup

0.11.0

  • Introduces the pull.zway.ZwayReceiver and pull.sensor.OpenWeather component
  • Introduces logging configurations. Integrates dictmentor package to augment configuration

0.10.0

  • Introduces engines. You are not enforced to explicitly use one and backward compatibility with legacy configs is given (actually the example configs work as they did before the change). So there shouldn’t be any Breaking change.