Pull ‘n’ Push¶
Installation¶
Installation is done by pip. Simple and straightforward:
pip install pnp
Some plugins might require some extras to work properly. Installation of extras is simple as well.
Example: The plugin pnp.plugins.pull.fitbit.Current
requires the extra fitbit
. To install the extra:
pip install pnp[fitbit]
If you need to install multiple extras:
pip install pnp[extra1,extra2,extra3]
Please consult the plugin documentation to see if a component requires an extra or not.
Important
We cannot ensure not to introduce any breaking changes to interfaces / behaviour. This might occur every commit whether it is intended or by accident. Nevertheless we try to list breaking changes in the changelog that we are aware of.
You are encouraged to specify the version explicitly in your dependency tools, e.g.:
pip install pnp==0.23.0
Warning
It seems that the package uvloop has some issues on debian / raspbian stretch. pnp can run without uvloop but sanic installs it as a dependency anyways (but it is optional in our case). Unfortunately we can’t simply not install uvloop in the first place because of pnp’s package locking mechanism powered by poetry. As far as I can tell there is no way to instruct poetry to inject the necessary environment variable to tell sanic not to install uvloop. See the sanic documentation for more details.
In short: If you experience segmentation faults after running pnp you should consider to remove uvloop from your site-packages.:
pip uninstall uvloop
Quickstart¶
The major concepts of Pull ‘n’ Push are pulls and pushes (surprise!).
Pulls define how to fetch data from sources. A pull passes it’s fetched data to one or more push components.
Note
Example: The pull pnp.plugins.pull.monitor.Stats
periodically fetches statistics
from the host it is running on (like cpu and ram usage).
It passes it’s statistics on to the pnp.plugins.push.mqtt.Publish
to
publish those statistics on a mqtt broker.
Gluing together pulls and pushes is done by a task. It’s similar to defining an input / output pipeline.
Now let’s get serious. Copy and paste the following configuration to the file helloworld.yaml
.
tasks:
- name: hello-world
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
And run it by invoking the console runner:
pnp helloworld.yaml
This example yields the string 'Hello World'
every second.
Console Runner¶
If you install pnp via pip it will automatically install a script you can invoke:
> pnp --help
Usage: pnp [OPTIONS] CONFIGFILE
Pull 'n' Push. Runs or checks the given CONFIGFILE
Options:
-c, --check Only check the given config file but does
not run it.
--no-log-probe Disables the automatic logging configuration
probing.
--log FILE Specify logging configuration to load.
--log-level [DEBUG|INFO|WARNING|ERROR]
Overrides the log level.
--version Show the version and exit.
--help Show this message and exit.
Alternatively you can call the module via python interpreter:
python3 -m pnp --help
Building Blocks¶
Pull¶
The pull is the primary building block when it comes down to fetching / retrieving data from various sources.
Some examples:
- Start up a FTP server and wait for new uploaded files
- Listen to created, updated or deleted files in the file system
- Listen to a mqtt broker for new messages on a specific topic
- Frequently poll the fitbit api to fetch the current step count
- Frequently poll the host for statistics (like cpu / ram usage)
- …
As you can see pulls can either
- react on one or multiple events (like files created, updated, deleted)
- or frequently poll a source
to provide data to its downstream: the push
# Counts from 0 to infinity. Emits a new counter every second.
tasks:
- name: pull # Task name -> Glues together a pull and 1..n pushes
pull:
plugin: pnp.plugins.pull.simple.Count # Fully qualified path to the pull
args:
interval: 1s # Every second
push:
- plugin: pnp.plugins.push.simple.Echo
Push¶
A push simply processes the downstream data from a pull.
Some examples:
- Publish a message on a specific topic on an mqtt broker
- Send a templated notification via slack
- Execute a script using derived arguments
- Send a http request to a server
- Simply echo the data to the console (nice for debugging)
- …
# The first push Will publish the counter to the 'home/counter/state' topic
# on a locally running mqtt broker.
# The second push will print the counter to the console.
tasks:
- name: push
pull:
plugin: pnp.plugins.pull.simple.Count
push:
- plugin: pnp.plugins.push.mqtt.Publish # Fully qualified path of the push
args: # Arguments
host: localhost
topic: home/counter/state
port: 1883
retain: true
# You can specify more than one push.
# Multiple pushes will be executed in parallel.
- plugin: pnp.plugins.push.simple.Echo
Selector¶
Sometimes the output of a pull needs to be transformed before the downstream push can handle it. Selectors to the rescue: A selector transforms the output of a pull by using pure python code.
# Selector will transform output to 'x is Even' resp. 'x id Odd'
# when the counter is even resp. odd.
tasks:
- name: selector
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
push:
- plugin: pnp.plugins.push.simple.Echo
# The selector is written in pure python
selector: "'{} is {}'.format(payload, 'Even' if payload % 2 == 0 else 'Odd')"
Easy as that. We can reference our incoming data via data or payload.
See also
Dependencies¶
By default all defined pushes will execute in parallel when new incoming data is available. But now it would be awesome if we could chain pushes together. So that the output of one push becomes the input of the next push. The good thing is: Yes we can.
# Executes the command `echo hello world` at the shell level
# and passes the exit code, stdout and stderr to the next
# downstream push.
tasks:
- name: dependencies
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
push:
plugin: pnp.plugins.push.simple.Execute
args:
command: "echo hello world"
capture: true # Capture exit code, stdout and stderr
deps:
- plugin: pnp.plugins.push.simple.Echo
Envelope¶
New in version 0.7.0.
By using an envelope it is possible to change the behavior of pushes during runtime. One of the best examples is pnp.plugins.push.mqtt.Publish plugins where you can override the actual topic where the message should be published.
Easier to understand by a practical example:
# Uses an advanced selector to override push arguments during
# runtime
tasks:
- name: envelope
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
push:
- plugin: pnp.plugins.push.mqtt.Publish
# This is an advanced selector. See section Advanced Selector for details
selector:
# This is the actual counter
payload: "lambda payload: payload"
# This configures the topic based on the counter payload
# It can be either counter/even or counter/odd
topic: "lambda payload: 'counter/{}'.format('even' if payload % 2 == 0 else 'odd')"
args:
host: localhost
topic: home/counter/state
port: 1883
retain: true
How does this work
If the emitted or transformed payload (via selector) contains the key data or payload the pipeline assumes that this is the actual payload and all other keys represent the so called envelope.
Note
It is easy to construct envelopes by using the Advanced Selector Expression
UDFs¶
New in version 0.14.0.
When transforming the output of a pull with the help of a selector you sometimes need to call
python functions to perform certain operations. A selector does only provide very few basic functions like
type conversions (str
, float
, …) out of the box.
But the good thing is: You can register simple python builtin functions, special user defined functions and your own python functions to make use of them.
Besides providing function logic UDFs can carry a state and some special behavior like out of the box throttling.
udfs:
# Register the builtin str callable as my_str
- name: my_str
plugin: str
# Register a custom callable
- name: my_on_off
plugin: pnp.utils.on_off
# Instantiate a Counter user defined function
- name: count
plugin: pnp.plugins.udf.simple.Counter
# The presence of args tells pnp to instantiate a Counter
# -> important because it has a state (the actual count)
args:
init: 1
- name: count0
plugin: pnp.plugins.udf.simple.Counter
# Even without an argument (init is 0 by default), the args is important:
# This tells pnp to instantiate a Counter. Otherwise it is assumed to be a callable
args:
- name: count_err
# Without args. Each time you call count_err this will actually call the __init__ of the Counter class
plugin: pnp.plugins.udf.simple.Counter
tasks:
- name: hello-world
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
selector:
on_off: "lambda d: my_on_off(True)"
str: "lambda d: my_str(d)"
cnt0: "lambda d: count0()"
cnt1: "lambda d: count()"
# This will actually create a counter class instead of counting
cnterr: "lambda d: count_err(name='error')"
data: "lambda d: d"
See also
Advanced topics¶
Advanced selector expressions¶
New in version 0.12.0.
Instead of string-only selector expressions, you may now use complex dictionary and/or list constructs in your yaml to define a selector expression. If you use a dictionary or a list make sure to provide “real” selectors as a lambda expression, so the evaluator can decide if this is a string literal or an expression to evaluate.
The configuration below will repeat
{'hello': 'Hello', 'words': ['World', 'Moon', 'Mars']}
tasks:
- name: selector
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
wait: 1
repeat: "Hello World Moon Mars"
push:
- plugin: pnp.plugins.push.simple.Echo
selector:
hello: "lambda payload: payload.split(' ')[0]"
words:
- "lambda payload: payload.split(' ')[1]"
- "lambda payload: payload.split(' ')[2]"
- "lambda payload: payload.split(' ')[3]"
Prior to the implementation of the advanced selector feature your expressions would have probably looked similar to this:
dict(hello=payload.split(' ')[0], words=[payload.split(' ')[1], payload.split(' ')[2], payload.split(' ')[3]])
The first one is more readable, isn’t it?
A more complex example showcasing literals and lambda expressions in more depth:
tasks:
- name: selector
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
# Returns: 'World'
# no complex structure. Evaluator assumes that this is an expression -> you do not need a lambda
selector: "str(payload.split(' ')[0])"
- plugin: pnp.plugins.push.simple.Echo
# Returns {'header': 'this is a header', 'data': 'World', 'Hello': 'World'}
selector:
# Just string literals
header: this is a header
# Value is lambda and therefore evaluated
data: "lambda data: data.split(' ')[1]"
# Both are lambdas and therefore evaluated
"lambda data: str(data.split(' ')[0])": "lambda data: data.split(' ')[1]"
- plugin: pnp.plugins.push.simple.Echo
# Returns ['foo', 'bar', 'Hello', 'World']
selector:
- foo # String literal
- bar # String literal
# Lambda -> evaluate the expression
- "lambda d: d.split(' ')[0]"
# Lambda -> evaluate the expression
- "lambda d: d.split(' ')[1]"
API¶
New in version 0.24.0.
You can explicitly activate an rest-api to perform various stuff:
Determine if everything is up and running:
curl -X GET "http://localhost:9999/health"
Retrieve the current python
and pnp
version:
curl -X GET "http://localhost:9999/version"
Change the log level at runtime. Useful if you want to track down an issue. Example:
curl -X POST "http://localhost:9999/loglevel?level=DEBUG"
Retrieve prometheus compliant metrics:
curl -X GET "http://localhost:9999/metrics"
See the openapi specification in your browser: http://localhost:9999/docs
Trigger tasks manually (even when the task has some schedule assigned):
curl -X POST "http://localhost:9999/trigger?task=<task_name>"
Note
You need to explicitly enable the /metrics
endpoint while configuring your api.
Please see the example below for reference.
api:
port: 9999 # API listens on port 9999; mandatory
endpoints: # Optional
# Enable metrics endpoint: http://localhost:9999/metrics, default is false.
metrics: true
tasks:
- name: task
pull:
plugin: pnp.plugins.pull.monitor.Stats
args:
instant_run: true
interval: "*/1 * * * *"
push:
- plugin: pnp.plugins.push.simple.Echo
To create a task with no schedule assigned you can pass nothing
to the interval
argument of your task. This way the task will not be scheduled automatically. The only
way to run it is to do it explicitly via the api.
api:
port: 9999
endpoints:
metrics: false
tasks:
- name: my_task
pull:
plugin: pnp.plugins.pull.monitor.Stats
args:
# No schedule -> Execution: `curl -X POST http://localhost:9999/trigger?task=my_task`
interval:
push:
- plugin: pnp.plugins.push.simple.Echo
Augment configuration¶
Deprecated since version 0.28.0: Use YAML Tags. dictmentor for configuration augmentation will be removed in a future release.
You can augment the configuration by extensions from the dictmentor
package.
Please see dictmentor on Github for further reference.
The dictmentor
instance will be instantiated with the following code
and thus the following extensions:
from dictmentor import DictMentor, ext
return DictMentor(
ext.Environment(fail_on_unset=True),
ext.ExternalResource(base_path=os.path.dirname(config_path)),
ext.ExternalYamlResource(base_path=os.path.dirname(config_path))
)
The following configuration will demonstrate the configuration augmentation in more depth:
# Uses the dictmentor package to augment the configuration
# by dictmentor extensions.
# Make sure to export the environment variable to echo:
# export MESSAGE="Hello World"
tasks:
- name: dictmentor
pull:
external: repeat.pull
push:
- external: echo.push
- external: nop.push
# Contents of repeat.pull
plugin: pnp.plugins.pull.simple.Repeat
args:
wait: 1
repeat: "{{env::MESSAGE}}"
# Contents of echo.push
plugin: pnp.plugins.push.simple.Echo
# Contents of nop.push
plugin: pnp.plugins.push.simple.Nop
Engines¶
In version 0.22.0
I’ve decided to remove all engines except for the Async engine.
Due to some tests this engine is amazingly stable, has great performance and you do not
need to think about synchronizing parallel tasks so much.
This decision was basically driven from a maintenance view of perspective. In the future I like to add some infrastructural code like an api to communicate with the engine. And I will not be able to integrate necessary changes to all of the engines.
By default the AsyncEngine
is used implicitly:
tasks: # Implicit use of the AsyncEngine
- name: async
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
You can add it explicitly though.
This might be useful in the future IF I decide to add variants of the AsyncEngine
or
if you want to use a RetryHandler
.
engine: !engine # Use the AsyncEngine explicitly
type: pnp.engines.AsyncEngine
retry_handler: !retry
type: pnp.engines.SimpleRetryHandler
tasks:
- name: async
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
Logging¶
New in version 0.11.0.
You can use different logging configurations in two ways:
Either by specify the logging configuration when starting pnp
via the console
or by setting the environment variable PNP_LOG_CONF
and then run pnp
.
# Specify explicitly when starting pnp
pnp --log=<logging_configuration> <pnp_configuration>
# Specify by environment variable
export PNP_LOG_CONF=<logging_configuration>
pnp <pnp_configuration>
If you do not specify a logging configuration explicitly pnp will search for a logging.yaml
1. in the current working directory
2. in the directory where your pnp configuration file is located
You can disable this automatic and implicit logging configuration probing by starting pnp
by passing --no-log-probe
:
pnp --no-log-probe <pnp_configuration>
A simple logging configuration that will log severe errors to a separate rotating log file looks like this:
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
root:
level: INFO
handlers: [console, error_file_handler]
Payload unwrapping¶
Sometimes a pull
returns a list of items or you created a list of readings by
using the selector
. As long as a push
can process a list as an input everything
is fine. But now imagine that you want to process each item of that list
individually. Unwrapping
to the rescue. Unwrapping
will exactly allow
you what was asked for above: Pass each item of a list to a push
individually.
tasks:
- name: unwrapping
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat:
- 1
- 2
- 3
push:
- plugin: pnp.plugins.push.simple.Echo
unwrap: true # Magic!
But sometimes you want to add items to the list before processing it item
by item. You can do this by using a push.simple.Nop
properly.
tasks:
- name: unwrapping
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat:
- 1
- 2
- 3
push:
- plugin: pnp.plugins.push.simple.Nop
# You can add items to a list ...
selector: "data + [4, 5, 6]"
# ... and not unwrap (which is the default) ...
unwrap: false
# ... and use unwrap: true in a dependant push
deps:
- plugin: pnp.plugins.push.simple.Echo
unwrap: true
Retry handler¶
By using the explicit form of the engine specification you can specify a so called
RetryHandler
as well. A RetryHandler
controls what happens if your push
unexpectedly exits or raises an error.
NoRetryHandler
Will not retry ro run the pull
if it fails. If all pulls
exit for some reason
pnp
will exit as well.
engine: !engine
type: pnp.engines.AsyncEngine
retry_handler: !retry
type: pnp.engines.NoRetryHandler
tasks:
- name: async
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
SimpleRetryHandler
In addition to the NoRetryHandler
this handler will retry after a specified amount of time
configured by passing retry_wait
(default is 60 seconds).
engine: !engine # Use the AsyncEngine explicitly
type: pnp.engines.AsyncEngine
retry_handler: !retry
type: pnp.engines.SimpleRetryHandler
retry_wait: 60s # Default is 60 seconds as well
tasks:
- name: async
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
LimitedRetryHandler
In addition to retrying the pull
you can configure how many retries are allowed by passing
max_retries
(default is 3). Each failed pull
execution will raise the counter and
the counter will not reset.
engine: !engine # Use the AsyncEngine explicitly
type: pnp.engines.AsyncEngine
retry_handler: !retry
type: pnp.engines.LimitedRetryHandler
retry_wait: 60s # Default is 60 seconds as well
# Retry to run the pull 3 times before giving up
max_retries: 3
tasks:
- name: async
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
AdvancedRetryHandler
In addition to the LimitedRetryHandler
the counter will be reset if the pull
has ran successful for a specific amount of time by passing reset_retry_threshold
(default is 60 seconds).
engine: !engine # Use the AsyncEngine explicitly
type: pnp.engines.AsyncEngine
retry_handler: !retry
type: pnp.engines.AdvancedRetryHandler
retry_wait: 60s # Default is 60 seconds as well
# Retry to run the pull 3 times before giving up
max_retries: 3
# Will reset the retry count after 60 seconds
# of successful execution of the pull
reset_retry_threshold: 60s
tasks:
- name: async
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
Note
If no RetryHandler
is explicitly specified the AdvancedRetryHandler
will be used.
The instance created will use the default values for it’s arguments.
Suppress push¶
Sometimes you want to suppress (do not execute) a push based on the actual
incoming payload from a pull
. To achieve this all you have to do is to
emit the special sentinel SUPPRESS
from your selector
expression.
tasks:
- name: selector
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
push:
- plugin: pnp.plugins.push.simple.Echo
# Only print the counter if it is even. Suppress odd ones.
selector: "payload if payload % 2 == 0 else SUPPRESS"
UDF Throttle¶
Consider the following situation: You have a selector that uses a udf to fetch a state from an external system.
The state won’t change so often, but your selector will fetch the state every time a pull transports a payload to the push. You want to decrease the load on the external system and you want to increase throughput.
Throttling to the rescue. Specifying throttle
when instantiating your udf will manage that your udf will only
call the external system once and cache the result. Subsequent calls will either return the cached result or call the
external system again when a specified time has passed since the last call that actually fetched a result from the external
system.
udfs:
- name: count # Instantiate a Counter user defined function
plugin: pnp.plugins.udf.simple.Counter
args: # The presence of args tells pnp to instantiate a Counter - important because it has a state (the actual count)
init: 1
# Will only call the counter if 10 seconds passed between current call and last call.
# In the meantime a cached result will be returned.
throttle: 10s
tasks:
- name: hello-world
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat: "Hello World"
push:
- plugin: pnp.plugins.push.simple.Echo
selector:
counter: "lambda d: count()"
YAML Tags¶
You can use yaml tags to augment your yaml configuration. Right now the following tags are supported:
!include
: To incorporate another yaml file into your configuration!env
: To inject a environment variable. Use:=
to denote a default value. The configuration loading process will fail if the environment variable is not set and no default is given.
The following example will demonstrate the yaml tags in more depth:
# Use !include to incorporate another yaml file into this one
tasks:
- name: tags
pull: !include _repeat.yaml
push:
- !include _echo.yaml
- !include _nop.yaml
# Contents of _repeat.yaml
plugin: pnp.plugins.pull.simple.Repeat
args:
wait: 1
repeat: !env MESSAGE:=Hello World
# Contents of _echo.yaml
plugin: pnp.plugins.push.simple.Echo
# Contents of _nop.yaml
plugin: pnp.plugins.push.simple.Nop
Plugins¶
Pulls¶
Pulls can be divided into pulls that react on events and pulls that regularly poll for data.
So called Polling
components are special pulls
that - as stated earlier - regularly poll data or just execute
in regular intervals.
Besides the arguments stated in the component description polls
always have the following
arguments to control their polling behavior.
name | type | opt. | default | description |
---|---|---|---|---|
interval | str/float | yes | 60s | You may specify duration literals such as 60 (60 secs), 1m , 1h (…) to realize a periodic polling or cron expressions e.g. */1 * * * * (every minute) to realize cron like behavior. |
instant_run | bool | yes | False | If set to True the component will run as soon as pnp starts; otherwise it will run the next configured interval. |
fitbit.Current¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.fitbit.Current | poll | fitbit | 0.13.0 |
Description
Requests various current metrics (steps, calories, distance, …) from the fitbit api for a specific account.
Please see Fitbit Authentication to configure to prepare your account accordingly.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
config | str | no | n/a | The configuration file that keeps your initial and refreshed authentication tokens (see Fitbit Authentication for detailed information) |
resources | List[str] | no | n/a | The resources to request (see below for possible options) |
system | str | yes | None | The metric system to use based on your localisation (de_DE, en_US, …). Default is your configured metric system in your fitbit account |
Note
You can query the following resources:
- activities/calories
- activities/caloriesBMR
- activities/steps
- activities/distance
- activities/floors
- activities/elevation
- activities/minutesSedentary
- activities/minutesLightlyActive
- activities/minutesFairlyActive
- activities/minutesVeryActive
- activities/activityCalories
- body/bmi
- body/fat
- body/weight
- foods/log/caloriesIn
- foods/log/water
- sleep/awakeningsCount
- sleep/efficiency
- sleep/minutesAfterWakeup
- sleep/minutesAsleep
- sleep/minutesAwake
- sleep/minutesToFallAsleep
- sleep/startTime
- sleep/timeInBed
Result
Emits a map that contains the requested resources and their associated values:
{
"activities/calories": 1216,
"activities/caloriesBMR": 781,
"activities/steps": 4048,
"activities/distance": 3.02385,
"activities/floors": 4,
"activities/elevation": 12,
"activities/minutes_sedentary": 127,
"activities/minutes_lightly_active": 61,
"activities/minutes_fairly_active": 8,
"activities/minutes_very_active": 24,
"activities/activity_calories": 484,
"body/bmi": 23.086421966552734,
"body/fat": 0.0,
"body/weight": 74.8,
"foods/log/calories_in": 0,
"foods/log/water": 0.0,
"sleep/awakenings_count": 0,
"sleep/efficiency": 84,
"sleep/minutes_after_wakeup": 0,
"sleep/minutes_asleep": 369,
"sleep/minutes_awake": 69,
"sleep/minutes_to_fall_asleep": 0,
"sleep/start_time": "21:50",
"sleep/time_in_bed": 438
}
Example
# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration
- name: fitbit_current
pull:
plugin: pnp.plugins.pull.fitbit.Current
args:
config: !env FITBIT_AUTH
instant_run: true
interval: 5m
resources:
- 'activities/calories'
- 'activities/caloriesBMR'
- 'activities/steps'
- 'activities/distance'
- 'activities/floors'
- 'activities/elevation'
- 'activities/minutesSedentary'
- 'activities/minutesLightlyActive'
- 'activities/minutesFairlyActive'
- 'activities/minutesVeryActive'
- 'activities/activityCalories'
- 'body/bmi'
- 'body/fat'
- 'body/weight'
- 'foods/log/caloriesIn'
- 'foods/log/water'
- 'sleep/awakeningsCount'
- 'sleep/efficiency'
- 'sleep/minutesAfterWakeup'
- 'sleep/minutesAsleep'
- 'sleep/minutesAwake'
- 'sleep/minutesToFallAsleep'
- 'sleep/startTime'
- 'sleep/timeInBed'
push:
- plugin: pnp.plugins.push.simple.Echo
fitbit.Devices¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.fitbit.Devices | poll | fitbit | 0.13.0 |
Description
Requests details about your fitbit devices / trackers (battery, model, …) associated to your account.
Please see Fitbit Authentication to configure to prepare your account accordingly.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
config | str | no | n/a | The configuration file that keeps your initial and refreshed authentication tokens (see Fitbit Authentication for detailed information) |
system | str | yes | None | The metric system to use based on your localisation (de_DE, en_US, …). Default is your configured metric system in your fitbit account |
Result
Emits a list that contains your available trackers and/or devices and their associated details:
[{
"battery": "Empty",
"battery_level": 10,
"device_version": "Charge 2",
"features": [],
"id": "abc",
"last_sync_time": "2018-12-23T10:47:40.000",
"mac": "AAAAAAAAAAAA",
"type": "TRACKER"
}, {
"battery": "High",
"battery_level": 95,
"device_version": "Blaze",
"features": [],
"id": "xyz",
"last_sync_time": "2019-01-02T10:48:39.000",
"mac": "FFFFFFFFFFFF",
"type": "TRACKER"
}]
Example
# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration
tasks:
- name: fitbit_devices
pull:
plugin: pnp.plugins.pull.fitbit.Devices
args:
config: !env FITBIT_AUTH
instant_run: true
interval: 15m
push:
- plugin: pnp.plugins.push.simple.Echo
fitbit.Goal¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.fitbit.Goal | poll | fitbit | 0.13.0 |
Description
Requests your goals (water, steps, …) from the fitbit api.
Please see Fitbit Authentication to configure to prepare your account accordingly.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
config | str | no | n/a | The configuration file that keeps your initial and refreshed authentication tokens (see Fitbit Authentication for detailed information) |
resources | List[str] | no | n/a | The resources to request (see below for possible options) |
system | str | yes | None | The goals to request (see below for detailed information) |
Note
You can query the following resources:
- body/fat
- body/weight
- activities/daily/activeMinutes
- activities/daily/caloriesOut
- activities/daily/distance
- activities/daily/floors
- activities/daily/steps
- activities/weekly/distance
- activities/weekly/floors
- activities/weekly/steps
- foods/calories
- foods/water
Result
Emits a map structure that consists of the requested goals:
{
"body/fat": 15.0,
"body/weight": 70.0,
"activities/daily/active_minutes": 30,
"activities/daily/calories_out": 2100,
"activities/daily/distance": 5.0,
"activities/daily/floors": 10,
"activities/daily/steps": 6000,
"activities/weekly/distance": 5.0,
"activities/weekly/floors": 10.0,
"activities/weekly/steps": 6000.0,
"foods/calories": 2220,
"foods/water": 1893
}
Example
# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration
tasks:
- name: fitbit_goal
pull:
plugin: pnp.plugins.pull.fitbit.Goal
args:
config: !env FITBIT_AUTH
instant_run: true
interval: 5m
goals:
- body/fat
- body/weight
- activities/daily/activeMinutes
- activities/daily/caloriesOut
- activities/daily/distance
- activities/daily/floors
- activities/daily/steps
- activities/weekly/distance
- activities/weekly/floors
- activities/weekly/steps
- foods/calories
- foods/water
push:
- plugin: pnp.plugins.push.simple.Echo
fs.FileSystemWatcher¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.fs.FileSystemWatcher | pull | fswatcher | < 0.10.0 |
Description
Watches the given directory for changes like created, moved, modified and deleted files.
Per default will recursively report any file that is touched, changed or deleted in the given path. The
directory itself or subdirectories will be object to reporting too, if ignore_directories
is set to False
.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
path | str | no | n/a | The path to track for file / directory changes |
recursive | bool | yes | True | If set to True, any subfolders of the given path will be tracked too. |
patterns | List[str] | yes | None | Any file pattern (e.g. *.txt or *.txt, *.md. If set to None no filter is applied. |
ignore_patterns | List[str] | yes | None | Any patterns to ignore (specify like argument patterns ). If set to None, nothing will be ignored. |
ignore_directories | bool | yes | False | If set to True will send events for directories when file change. |
case_sensitive | bool | yes | False | If set to True, any pattern is case_sensitive, otherwise it is case insensitive. |
events | List[str] | yes | None | The events to track. One or multiple of ‘moved’, ‘deleted’, ‘created’ and/or ‘modified’. If set to None all events will be reported. |
load_file | bool | yes | False | If set to True the file contents will be loaded into the result. |
mode | str | yes | auto | Open mode of the file (only necessary when load_file is True). Can be text, binary or auto (guessing). |
base64 | bool | yes | False | If set to True the loaded file contents will be converted to base64 (only applicable when load_file is True). Argument mode will be automatically set to ‘binary’ |
defer_modified | float | yes | 0.5 | There might be multiple flushes of a file before it is written completely to disk. Without defer_modified each flush will raise a modified event. |
Result
Example of an emitted message:
{
"operation": "modified",
"source": "/tmp/abc.txt",
"is_directory": False,
"destination": None, # Only non-None when operation = "moved"
"file": { # Only present when load_file is True
"file_name": "abc.txt",
"content": "foo and bar",
"read_mode": "text",
"base64": False
}
}
Example
tasks:
- name: file_watcher
pull:
plugin: pnp.plugins.pull.fs.FileSystemWatcher
args:
path: "/tmp"
ignore_directories: true
events: [created, deleted, modified]
load_file: false
push:
plugin: pnp.plugins.push.simple.Echo
fs.Size¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.fs.Size | poll | none | 0.17.0 |
Description
Periodically determines the size of the specified files or directories in bytes.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
paths | List[str] | no | n/a | List of files and/or directories to monitor their sizes in bytes. |
fail_on_error | bool | yes | True | If set to true, the plugin will raise an error when a file/directory does not exists or any other file system related error occurs. Otherwise the plugin will proceed and simply report None as size. |
Note
Be careful when adding directories with a large amount of files. This will be prettly slow cause the plugin will iterate over each file and determine it’s individual size.
Result
Example of an emitted message. Size is in bytes.
{
"logs": 32899586,
"copy": 28912
}
Example
tasks:
- name: file_size
pull:
plugin: pnp.plugins.pull.fs.Size
args:
instant_run: true
interval: 5s
fail_on_error: false
paths:
logs: /var/log # directory - recursively determines size
copy: /bin/cp # file
push:
plugin: pnp.plugins.push.simple.Echo
ftp.Server¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.ftp.Server | pull | ftp | 0.17.0 |
Description
Runs a ftp server on the specified port to receive and send files by ftp protocol.
Optionally sets up a simple user/password authentication mechanism.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
directory | str | yes | None | The directory to serve via ftp protocol. If not given a directory is created is created temporarily to accept incoming uploads. |
port | int | yes | 2121 | The port to listen on. |
user_pwd | str / Tuple[str] | yes | None | User/password combination (as a tuple/list; see example). You may specify the user only - password will be empty OR you can enable anonymous access by not providing the argument. |
events | List[str] | yes | None | A list of events to subscribe to. Available events are: connect, disconnect, login, logout, file_received, file_sent, file_received_incomplete, file_sent_incomplete. By default all events are subscribed. |
max_cons | int | yes | 256 | The maximum number of simultaneous connections the ftpserver will permit. |
max_cons_ip | int | yes | 5 | The maximum number of simultaneous connections from the same ip. Default is 5. |
Result
All emitted messages will have an event field to identify the type of the event and an - optional - data field.
The data field will contain the user for login/logout events and the file_path for file-related events.
{
"event": "file_received",
"data": {
"file_path": "/private/tmp/ftp/test.txt"
}
}
Example
tasks:
- name: ftp_server
pull:
plugin: pnp.plugins.pull.ftp.Server
args:
directory: !env FTP_DIR
user_pwd: [admin, root] # user: admin, pw: root
events:
- file_received
- file_sent
push:
- plugin: pnp.plugins.push.simple.Echo
gpio.Watcher¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.gpio.Watcher | poll | gpio | 0.12.0 |
Description
Listens for low/high state changes on the configured gpio pins.
In more detail the plugin can raise events when one of the following situations occur:
- rising (high) of a gpio pin - multiple events may occur in a short period of time
- falling (low) of a gpio pin - multiple events may occur in a short period of time
- switch of gpio pin - will suppress multiple events a defined period of time (bounce time)
- motion of gpio pin - will raise the event motion_on if the pin rises and set a timer with a configurable amount of time. Any other gpio rising events will reset the timer. When the timer expires the motion_off event is raised.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
pins | List[int] | no | n/a | The gpio pins to observe for state changes. Please see the examples section on how to configure it. |
default | str | no | n/a | The default edge that is applied when not configured. Please see the examples section for further details. One of rising , falling , switch , motion |
Result
Emits a dictionary that contains an entry for every sensor of the plant sensor device:
{
"gpio_pin": 17 # The gpio pin which state has changed
"event": rising # One of [rising, falling, switch, motion_on, motion_off]
}
Example
tasks:
- name: gpio
pull:
plugin: pnp.plugins.pull.gpio.Watcher
args:
default: rising
pins:
- 2 # No mode specified: Default mode (in this case 'rising')
- 2 # Duplicates will get ignored
- 3:rising # Equal to '3' (without explicit mode)
- 3:falling # Get the falling event for gpio pin 3 as well
- 4:switch # Uses some debouncing magic and emits only one rising event
- 5:switch(1000) # Specify debounce in millseconds (default is 500ms)
- 5:switch(500) # Duplicates - even when they have other arguments - will get ignored
- 7:motion # Uses some delay magic to emit only one motion on and one motion off event
- 9:motion(1m) # Specify delay (default is 30 seconds)
push:
- plugin: pnp.plugins.push.simple.Echo
hass.State¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.hass.State | pull | none | 0.14.0 |
Description
Connects to the home assistant
websocket api and listens for state changes. If no include
or exclude
is defined
it will report all state changes. If include
is defined only entities that match one of the specified patterns will
be emitted. If exclude
if defined entities that match at least one of the specified patterns will be ignored. exclude
patterns overrides include
patterns.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
host | str | no | n/a | Url to your home assistant instance (e.g. http://my-hass:8123 ) |
token | str | no | n/a | Your long lived access token to access the websocket api. See below for further instructions |
include | List[str] | yes | n/a | Patterns of entity state changes to include. All state changes that do not match the defined patterns will be ignored |
exclude | List[str] | yes | n/a | Patterns of entity state changes to exclude. All state changes that do match the defined patterns will be ignored |
Note
include
andexclude
support wildcards (e.g*
and?
)exclude
overridesinclude
. So you can include everything from a domain (sensor.*
) but exclude individual entities.- Create a long lived access token: Home Assistant documentation
Result
The emitted result always contains the entity_id, new_state and old_state:
{
"entity_id": "light.bedroom_lamp",
"old_state": {
"state": "off",
"attributes": {},
"last_changed": "2019-01-08T18:24:42.087195+00:00",
"last_updated": "2019-01-08T18:40:40.011459+00:00"
},
"new_state": {
"state": "on",
"attributes": {},
"last_changed": "2019-01-08T18:41:06.329699+00:00",
"last_updated": "2019-01-08T18:41:06.329699+00:00"
}
}
Example
tasks:
- name: hass_state
pull:
plugin: pnp.plugins.pull.hass.State
args:
url: http://localhost:8123
token: !env HA_TOKEN
exclude:
- light.lamp
include:
- light.*
push:
- plugin: pnp.plugins.push.simple.Echo
http.Server¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.http.Server | pull | none | < 0.10.0 |
Description
Creates a specific route on the builtin api server and listens to any call to that route.
Any data passed to the endpoint will be tried to be parsed to a dictionary (json).
If this is not possible the data will be passed as is. See sections Result
for specific payload and examples.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
prefix_path | str | no | n/a | The route to create for incoming traffic on the builtin api server. See the Example section for reference. |
allowed_methods | List[str] | yes | GET | List of http methods that are allowed. Default is ‘GET’. |
Result
Assumes that you configured your pull with prefix_path = callme
curl -X GET "http://localhost:9999/callme/telephone/now?number=12345&priority=high" --data '{"magic": 42}'
{
"endpoint": "telephone/now",
"data": {"magic": 42},
"levels": ["telephone", "now"],
"method": "GET",
"query": {"number": "12345", "priority": "high"},
"is_json": True,
"url": "http://localhost:9999/callme/telephone/now?number=12345&priority=high",
"full_path": "/callme/telephone/now?number=12345&priority=high",
"path": "/callme/telephone/now"
}
Example
#
# Registers the endpoint /callme to the builtin api server.
# Use curl to try it out:
# curl -X GET "http://localhost:9999/callme/telephone/now?number=12345&priority=high" --data '{"magic": 42}'
#
api: # You need to enable the api
port: 9999 # Mandatory
tasks:
- name: server
pull:
plugin: pnp.plugins.pull.http.Server
args:
prefix_path: callme # Results into http://localhost:9999/callme
allowed_methods: # Specify which methods are allowed
- GET
- POST
push:
plugin: pnp.plugins.push.simple.Echo
monitor.Stats¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.monitor.Stats | poll | none | 0.12.0 |
Description
Emits every interval
various metrics / statistics about the host system.
Please see the Result
section for available metrics.
Result
Emits a dictionary that contains an entry for every sensor of the plant sensor device:
{
"cpu_count": 4,
"cpu_freq": 2700,
"cpu_temp": 0.0,
"cpu_use": 80.0,
"disk_use": 75.1,
"load_1m": 2.01171875,
"load_5m": 1.89501953125,
"load_15m": 1.94189453125,
"memory_use": 67.0,
"rpi_cpu_freq_capped": 0,
"rpi_temp_limit_throttle": 0,
"rpi_throttle": 0,
"rpi_under_voltage": 0,
"swap_use": 36.1
}
Example
tasks:
- name: stats
pull:
plugin: pnp.plugins.pull.monitor.Stats
args:
interval: 10s
instant_run: true
push:
plugin: pnp.plugins.push.simple.Echo
mqtt.Subscribe¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.mqtt.Subscribe | pull | none | < 0.10.0 |
Description
Pulls messages from the specified topic from the given mosquitto mqtt broker (identified by host and port).
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
host | str | no | n/a | Host where the mosquitto broker is running. |
port | int | no | n/a | Port where the mosquitto broker is listening. |
topic | str | no | n/a | Topic to listen for new messages. You can listen to multiple topics by using the #-wildcard (e.g. test/# will listen to all topics underneath test). |
Result
The emitted message will look like this:
{
"topic": "test/device/device1",
"levels": ["test", "device", "device1"]
"payload": "The actual event message"
}
Example
tasks:
- name: mqtt
pull:
plugin: pnp.plugins.pull.mqtt.Subscribe
args:
host: localhost
port: 1883
topic: test/#
push:
plugin: pnp.plugins.push.simple.Echo
net.PortProbe¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.net.PortProbe | poll | none | 0.19.0 |
Description
Periodically establishes socket connection to check if anybody is listening on a given server on a specific port.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
port | int | no | n/a | The port to probe if a service is listening |
server | str | yes | localhost | Server name or ip address |
timeout | float | yes | 1.0 | Timeout for remote operations |
Result
Emits a dictionary that contains an entry for every sensor of the plant sensor device:
{
"server": "www.google.de",
"port": 80,
"reachable": True
}
Example
tasks:
- name: port_probe
pull:
plugin: pnp.plugins.pull.net.PortProbe
args:
server: localhost # Server name or ip address, default is localhost
port: 9999 # The port to probe if somebody is listening
interval: 5s # Probe the port every five seconds ...
instant_run: true # ... and run as soon as pnp starts
push:
- plugin: pnp.plugins.push.simple.Echo
net.Speedtest¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.net.Speedtest | poll | speedtest | 0.25.0 |
Description
Performs a speedtest of your internet connection using speedtest.net
Arguments
No arguments
Result
{
"download_speed_bps": 13815345.098080076,
"download_speed_mbps": 13.82,
"upload_speed_bps": 1633087.176468341,
"upload_speed_mbps": 1.63,
"ping_latency": 19.933,
"result_image": "http://www.speedtest.net/result/10049630297.png",
"server": {
"name": "Deutsche Telekom",
"host": "ham.wsqm.telekom-dienste.de:8080",
"location": {
"city": "Hamburg",
"country": "Germany",
"lat": "53.5653",
"lon": "10.0014"
}
},
"client": {
"isp": "Vodafone DSL",
"rating": "3.7"
}
}
Example
tasks:
- name: speedtest
pull:
plugin: pnp.plugins.pull.net.Speedtest
args:
num_parallel_requests: 2 # Number of parallel requests
interval: 1h # Run every hour
instant_run: true # Run as soon as pnp starts
push:
- plugin: pnp.plugins.push.simple.Echo
net.SSLVerify¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.net.SSLVerify | poll | none | 0.22.0 |
Description
Periodically checks if the ssl certificate of a given host is valid and how many days are remaining before the certificate will expire.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
host | str | no | n/a | The host to check for it’s SSL certificate. |
timeout | float | yes | 3.0 | Timeout for remote operation. |
Result
{
# Envelope
"host": "www.google.com",
"payload": {
"expires_days": 50, # Remaining days before expiration
"expires_at": datetime.datetime(2020, 5, 26, 9, 45, 52), # Python datetime of expiration
"expired": False # True of the certificate is expired; otherwise False.
}
}
Example
tasks:
- name: ssl_verify
pull:
plugin: pnp.plugins.pull.net.SSLVerify
args:
host: www.google.com # Check the ssl certificate for this host
interval: 1m # Check the ssl certificate every minute
instant_run: true # ... and run as soon as pnp starts
push:
- plugin: pnp.plugins.push.simple.Echo
presence.FritzBoxTracker¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.presence.FritzBoxTracker | poll | fritz | 0.22.0 |
Description
Periodically asks a Fritz!Box router for the devices that were connected in the past or right now.
Note
Extra fritz
is only compatible with python 3.6
or higher
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
host | str | yes | 169.254.1.1 | The IP address of your Fritz!Box. |
user | str | yes | admin | The user to use. |
password | str | yes | <empty> | The password to use. |
offline_delay | int | yes | 0 | Defines how many intervals to wait before marking a device as not connected after the Fritz!Box reported the device as not connected anymore. This is useful for mobile devices that go temporarily to sleep and drop connection. Default is 0 -> Disconnected devices will be instantly reported as disconnected. |
whitelist | List[str] | yes | None | A specific list of devices to track (identified by mac address). If not passed all devices will be fetched. |
Note
By using the default values you should be able to connect to your Fritz!Box, because the necessary operation can be performed anonymously.
Result
{
"ip": "192.168.178.2",
"mac": "00:0a:95:9d:68:16",
"status": True, # True or False
"name": "pc1"
}
Example
tasks:
- name: fritzbox_tracker
pull:
plugin: pnp.plugins.pull.presence.FritzBoxTracker
args:
host: 169.254.1.1 # IP of your Fritz!Box. Default is 169.254.1.1
user: admin # User name. Default is admin
password: '' # Password. Default is an empty string
offline_delay: 0 # How many intervals to wait before marking a device as not connected after the fritzbox reported so
instant_run: true # ... and run as soon as pnp starts
push:
- plugin: pnp.plugins.push.simple.Echo
tasks:
- name: fritzbox_tracker_whitelist
pull:
plugin: pnp.plugins.pull.presence.FritzBoxTracker
args:
host: 169.254.1.1 # IP of your Fritz!Box. Default is 169.254.1.1
user: admin # User name. Default is admin
password: '' # Password. Default is an empty string
offline_delay: 0 # How many intervals to wait before marking a device as not connected after the fritzbox reported so
whitelist: # A specific list of devices to track (identified by mac address)
- B0:05:94:77:B8:3B
- 90:CD:B6:DC:8D:61
instant_run: true # ... and run as soon as pnp starts
push:
- plugin: pnp.plugins.push.simple.Echo
sensor.DHT¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.sensor.DHT | poll | dht | < 0.10.0 |
Description
Periodically polls a dht11 or dht22 (aka am2302) for temperature and humidity readings.
Polling interval is controlled by interval
.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
device | str | yes | dht22 | The device to poll (one of dht22, dht11, am2302). |
data_gpio | int | yes | 17 | The data gpio port where the device operates on. |
humidity_offset | float | yes | 0.0 | Positive/Negative offset for humidity. |
temp_offset | float | yes | 0.0 | Positive/Negative offset for temperature. |
Result
{
"humidity": 65.4 # in %
"temperature": 23.7 # in celsius
}
Example
tasks:
- name: dht
pull:
plugin: pnp.plugins.pull.sensor.DHT
args:
device: dht22 # Connect to a dht22
data_gpio: 17 # DHT is connected to gpio port 17
interval: 5m # Polls the readings every 5 minutes
humidity_offset: -5.0 # Subtracts 5% from the humidity reading
temp_offset: 1.0 # Adds 1 °C to the temperature reading
instant_run: true
push:
- plugin: pnp.plugins.push.simple.Echo
selector: payload.temperature # Temperature reading
- plugin: pnp.plugins.push.simple.Echo
selector: payload.humidity # Humidity reading
sensor.MiFlora¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.sensor.MiFlora | poll | miflora | 0.16.0 |
Description
Periodically polls a Xiaomi MiFlora plant sensor
for sensor readings
(temperature, conductivity, light, …) via btle
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
mac | str | no | n/a | The device to poll identified by mac address. See below for further instructions |
adapter | str | yes | hci0 | The bluetooth adapter to use (if you have more than one). Default is |
Note
Start a bluetooth scan to determine the MAC addresses of the sensor (look for Flower care or Flower mate entries) using this command:
$ sudo hcitool lescan
LE Scan ...
F8:04:33:AF:AB:A2 [TV] UE48JU6580
C4:D3:8C:12:4C:57 Flower mate
[...]
Result
Emits a dictionary that contains an entry for every sensor of the plant sensor device:
{
"conductivity": 800,
"light": 2000,
"moisture": 42,
"battery": 72,
"temperature": 24.2,
"firmaware": "3.1.9"
}
Example
- name: miflora
pull:
plugin: pnp.plugins.pull.sensor.MiFlora
args:
mac: 'C4:7C:8D:67:50:AB' # The mac of your miflora device
instant_run: true
push:
- plugin: pnp.plugins.push.simple.Echo
sensor.Sound¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.sensor.Sound | pull | sound | 0.15.0 |
Description
Listens to the microphone in realtime and searches the stream for specific sound patterns.
Practical example: I use this plugin to recognize my doorbell without tampering with the electrical device ;-)
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
wav_files | List[Dict] | no | n/a | See below for a detailed description |
device_index | int | yes | None | The index of the microphone device. Run pnp_record_sound --list to get the index.
If not specified pyAudio will try to find a capable device |
ignore_overflow | bool | yes | true | If set to True any buffer overflows due to slow realtime processing will be ignored. Otherwise an exception will be thrown and the plugin will abort. |
wav_files
A list of dictionaries containing the configuration for each file that contains an original sound pattern to listen for. Possible keys:
name type opt. default description path str no n/a The path to the original sound file. Absolute or relative to the pnp configuration file mode str yes pearson Correlation/similarity method. Default is pearson. Try out which one is best for you offset float yes 0.0 Adjusts sensitivity for similarity. Positive means less sensitive; negative is more sensitive. You should try out 0.1 steps cooldown Dict yes special See below for a detailed description cooldown
Contains the cooldown configuration. Default is a cooldown period of 10 seconds and no emit of a cooldown event. Possible keys:
name type opt. default description period str yes 10s Prevents the pull to emit more than one sound detection event per cool down period. emit_event bool yes false If set to true the end of the cooldown period will an emit as well.
Note
- You can list your available input devices:
pnp_record_sound --list
- You can record a wav file from an input device:
pnp_record_sound <out.wav> --seconds=<seconds_to_record> --index=<idx>
- This one is _not_ pre-installed when using the docker image. Would be grateful if anyone can integrate it
Result
Will emit the event below when the correlation coefficient is above or equal the threshold. In this case the component has detected a sound that is similar to one of the given sound patterns
{
"type": "sound" # Type 'sound' means we detected a sound pattern
"sound": ding, # Name of the wav_file without path and extension. To differentiate if you have multiple patterns you listen to
"corrcoef": 0.82, # Correlation coefficient probably between [-1;+1] for pearson
"threshold": 0.6 # Threshold influenced by sensitivity_offset
}
Will emit the event below when you have configured the component to send cooldown events as well.
{
"type": "cooldown" # Type 'cooldown' means that we previously identified a sound pattern and the cooldown has happened
"sound": ding, # Name of the wav_file without path and extension. To differentiate if you have multiple patterns you listen to
}
Example
tasks:
- name: sound_detector
pull:
plugin: pnp.plugins.pull.sensor.Sound
args:
wav_files: # The files to compare for similarity
- path: ding.wav # Absolute or relative (from the config) path to the wav file
mode: std # Use std correlation coefficient [pearson, std]; optional default is pearson
offset: -0.5 # Adjust sensitivity. Positive means less sensitive; negative is more sensitive. Default is 0.0
- path: doorbell.wav # This will use default values for mode and offset (pearson, 0.0)
cooldown:
period: 10s # Prevents the pull to emit more than one sound detection event every 10 seconds
emit_event: true # Fire an event after the actual cool down - Useful for binary_sensors to return to their 'off' state
device_index: # The index of the microphone devices. If not specified pyAudio will try to find a capable device
ignore_overflow: true # Some devices might be too slow to process the stream in realtime. Ignore any buffer overflow errors.
push:
- plugin: pnp.plugins.push.simple.Echo
simple.Count¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.simple.Count | poll | none | < 0.10.0 |
Description
Emits every interval
seconds a counting value which runs from from_cnt
to to_cnt
.
If to_cnt
is None the counter will count to infinity (or more precise to sys.maxsize
).
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
from_cnt | int | yes | 0 | Starting value of the counter. |
to_cnt | int | yes | sys.maxsize | End value of the counter. If not passed set to “infinity” (precise: sys.maxsize ) |
Result
Counter value (int).
Example
tasks:
- name: count
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
from_cnt: 1
to_cnt: 10
push:
plugin: pnp.plugins.push.simple.Echo
simple.Cron¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.simple.Cron | pull | none | 0.16.0 |
Description
Execute push-components based on time constraints configured by cron-like expressions.
This plugin basically wraps cronex to parse cron expressions and to check if
any job is pending. See the documentation of cronex
for a guide on featured/supported cron expressions.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
exppresions | List[str] | no | n/a | Cron like expressions to configure the scheduler. |
Result
Imagine your cron expressions looks like this: */1 * * * * every minute
.
The pull will emit the text every minute
every minute.
Example
tasks:
- name: cron
pull:
plugin: pnp.plugins.pull.simple.Cron
args:
expressions:
- "*/1 * * * * every minute"
- "0 15 * * * 3pm"
- "0 0 * * * midnight every day"
- "0 16 * * 1-5 every weekday @ 4pm"
push:
plugin: pnp.plugins.push.simple.Echo
simple.Repeat¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.simple.Repeat | poll | none | < 0.10.0 |
Description
Emits every interval
seconds the same repeat
.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
repeat | Any | no | n/a | The object to emit. |
Result
Emits the repeat
-object as it is.
Example
tasks:
- name: repeat
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
repeat: "Hello World" # Repeats 'Hello World'
interval: 1s # Every second
push:
plugin: pnp.plugins.push.simple.Echo
simple.RunOnce¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.simple.RunOnce | pull | none | 0.23.0 |
Description
Takes a valid plugins.pull.Polling
component and immediately executes it and ventures
down the given plugins.push
components. If no component to wrap is given it will simple execute the
push chain.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
poll | plugin | yes | None | The polling component you want to run once. If not passed the push chain will be executed. |
Result
Emits the payload of the polling component if given. Otherwise an empty dictionary will be returned.
Example
tasks:
- name: run_once
pull:
plugin: pnp.plugins.pull.simple.RunOnce
push:
plugin: pnp.plugins.push.simple.Echo
tasks:
- name: run_once_wrapped
pull:
plugin: pnp.plugins.pull.simple.RunOnce
args:
poll:
plugin: pnp.plugins.pull.monitor.Stats
push:
plugin: pnp.plugins.push.simple.Echo
zway.ZwayPoll¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.pull.zway.ZwayPoll | poll | none | < 0.10.0 |
Description
Pulls the specified json content from the zway rest api. The content is specified by the url, e.g.
http://<host>:8083/ZWaveAPI/Run/devices
will pull all devices and serve the result as a json.
Specify the polling interval by setting the argument interval
. User / password combination is required when
your api is protected against guest access (by default it is).
Use multiple pushes and the related selectors to extract the required content like temperature readings (see the examples section for guidance).
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
url | str | no | n/a | The url to poll periodically. |
user | str | no | n/a | Authentication user name. |
password | str | no | n/a | Authentication password. |
Note
Below are some common selector examples to fetch various metrics from various devices
Fibaro Motion Sensor
- Temperature
payload[deviceid].instances[0].commandClasses[49].data[1].val.value
- Luminescence
payload[deviceid].instances[0].commandClasses[49].data[3].val.value
Fibaro Wallplug
- Meter
payload[deviceid].instances[0].commandClasses[50].data[0].val.value
Thermostat (Danfoss / other should work as well)
- Setpoint
payload[deviceid].instances[0].commandClasses[67].data[1].val.value
Battery operated devices
- Battery level
payload[deviceid].instances[0].commandClasses[128].data.last.value
Result
Emits the content of the fetched url as it is.
Example
# Please make sure to adjust url and device ids
# Username and Password are injected from environment variables:
# export ZWAY_USER=admin
# export ZWAY_PASSWORD=secret_one
tasks:
- name: zway
pull:
plugin: pnp.plugins.pull.zway.ZwayPoll
args:
url: "http://smarthome:8083/ZWaveAPI/Run/devices"
interval: 5s
user: !env ZWAY_USER
password: !env ZWAY_PASSWORD
push:
- plugin: pnp.plugins.push.simple.Echo
# Temperature of fibaro motion sensor
# You can access the returned json like you would inquire the zway-api
selector: payload[19].instances[0].commandClasses[49].data[1].val.value
- plugin: pnp.plugins.push.simple.Echo
# Luminiscence of fibaro motion sensor
selector: payload[19].instances[0].commandClasses[49].data[3].val.value
Pushes¶
Like a pull
a push
does support args
to initialize the instance of a push
.
Besides that you can optionally pass a selector
to transform the incoming payload
and set the unwrap
option to invoke a push
for each element of an iterable
.
See also
Some pushes
do support the envelope
feature to alter the arguments for a push
during
runtime: Envelope
fs.FileDump¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.fs.FileDump | push | none | < 0.10.0 |
Description
This push dumps the given payload
to a file to the specified directory
.
If argument file_name
is None
, a name will be generated based on the current datetime (%Y%m%d-%H%M%S
).
If file_name
is not passed (or None
) you should pass extension
to specify the extension of the generated
file name.
Argument binary_mode
controls whether the dump is binary (mode=wb
) or text (mode=w
).
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
directory | str | yes | . (cwd) | no | The target directory to store the dumps. |
file_name | str | yes | None | yes | The name of the file to dump. If not passed a file name will be automatically generated. |
extension | str | yes | .dump | yes | The extension to use when the file name is automatically generated. |
binary_mode | bool | yes | False | no | If set to True the file will be written in binary mode (wb ); otherwise in text mode (w ). |
Result
Will return an absolute path to the file created.
Example
tasks:
- name: file_dump
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
repeat: "Hello World"
push:
plugin: pnp.plugins.push.fs.FileDump
args:
directory: !env WATCH_DIR
file_name: null # Auto-generated file (timestamp)
extension: ".txt" # Extension of auto-generated file
binary_mode: false # text mode
deps:
- plugin: pnp.plugins.push.simple.Echo
tasks:
- name: file_dump
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
repeat: "Hello World"
push:
plugin: pnp.plugins.push.fs.FileDump
# Override `file_name` and `extension` via envelope.
# Instead of an auto generated file, the file '/tmp/hello-world.hello' will be dumped.
selector:
data: "lambda data: data"
file_name: hello-world
extension: .hello
args:
directory: !env WATCH_DIR
file_name: null # Auto-generated file (timestamp)
extension: ".txt" # Extension of auto-generated file
binary_mode: false # text mode
deps:
- plugin: pnp.plugins.push.simple.Echo
fs.Zipper¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.fs.Zipper | push | none | 0.21.0 |
Description
The push expects a directory or a file path to be passed as the payload. As long it’s a valid path it will zip the directory or the single file and return the absolute path to the created zip file.
Note
You can use a so called .zipignore
file to exclude files and directories from zipping.
It works - mostly - like a .gitignore
file.
To use a .zipignore
file you have to put it in the root the folder you want to zip.
An example .zipignore
looks like this:
__pycache__/
*.log
This example will ignore all folder called __pycache__
and all files with the extension .log
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
source | str | yes | n/a | yes | Specifies the source directory or file to zip. If not passed the source can be specified by the envelope at runtime. |
out_path | str | yes | tmp | no | Specifies the path to the general output path where all target zip files should be generated. If not passed the systems temp directory is used. |
archive_name | str | yes | below | yes | Explicitly specifies the name of the resulting archive. |
The default of archive_name
will be either the original file name (if you zip a single file)
resp. the name of the zipped directory (if you zip a directory).
In both cases the extension .zip
will be added.
If you do not want an extension, you have to provide the archive_name
.
Result
Will return an absolute path to the zip file created.
Example
tasks:
- name: zipper
pull:
plugin: pnp.plugins.pull.simple.Cron
args:
expressions:
- "*/1 * * * * /path/to/backup"
push:
plugin: pnp.plugins.push.fs.Zipper
args:
out_path: !env BACKUP_DIR
deps:
plugin: pnp.plugins.push.simple.Echo
The next example is useful for dynamically adjusting the archive name to generate unique names for storing multiple backups:
tasks:
- name: zipper
pull:
plugin: pnp.plugins.pull.simple.Cron
args:
expressions:
- "*/1 * * * * /tmp/backup_folder"
push:
plugin: pnp.plugins.push.fs.Zipper
args:
out_path: !env BACKUP_DIR
selector:
archive_name: "lambda payload: '{}_{}'.format(now().isoformat(), 'backup.zip')"
data: "lambda payload: payload"
deps:
plugin: pnp.plugins.push.simple.Echo
hass.Service¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.hass.Service | push | none | 0.16.0 |
Description
Calls a home assistant service providing the payload as service-data.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
url | str | no | n/a | no | The url to your home assistant instance (e.g. http://hass:8123 ) |
token | str | no | n/a | no | The long live access token to get access to home assistant. |
domain | str | no | n/a | no | The domain of the service to call. |
service | str | no | n/a | no | The name of the service to call. |
timeout | int|float | yes | 5.0 | no | Tell the request to stop waiting for a response after given number of seconds. |
Note
Create a long lived access token: Home Assistant documentation
Result
Returns the payload as-is for better chaining (this plugin can’t add any useful information).
Example
# Calls the frontend.set_theme service to oscillate between a "light" and a "dark" theme
tasks:
- name: hass_service
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 10s
push:
plugin: pnp.plugins.push.hass.Service
selector:
name: "lambda i: 'clear' if i % 2 == 0 else 'dark'"
args:
url: http://localhost:8123
token: !env HA_TOKEN
domain: frontend
service: set_theme
# Calls the notify.notify service to send a message with the actual counter
tasks:
- name: hass_service
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 10s
push:
plugin: pnp.plugins.push.hass.Service
selector:
message: "lambda i: 'Counter: ' + str(i)"
args:
url: http://localhost:8123
token: !env HA_TOKEN
domain: notify
service: notify
http.Call¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.http.Call | push | none | 0.21.0 |
Description
Makes a request to a http resource.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
url | str | no | n/a | yes | Request url. |
method | str | yes | GET | yes | The http method to use for the request. Must be a valid http method (GET, POST, …). |
fail_on_error | bool | yes | False | yes | If True the push will fail on a http status code <> 2xx. This leads to an error message recorded into the logs and no further execution of any dependencies. |
provide_response | bool | yes | False | no | If True the push will not return the payload as it is, but instead provide the response status_code, fetched url content and a flag if the url content is a json response. This is useful for other push instances in the dependency chain. |
Result
Will return the payload as it is for easy chaining of dependencies.
If provide_response
is True the push will return a dictionary that looks like this:
{
"status_code": 200,
"data": "fetched url content",
"is_json": False
}
Example
# Simple example calling the built-in rest server
# Oscillates between http method GET and POST. Depending on the fact if the counter is even or not.
api:
port: 9999
tasks:
- name: http_call
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 5s
push:
plugin: pnp.plugins.push.http.Call
selector:
data:
counter: "lambda data: data"
method: "lambda data: 'POST' if int(data) % 2 == 0 else 'GET'"
args:
url: http://localhost:9999/counter
- name: rest_server
pull:
plugin: pnp.plugins.pull.http.Server
args:
prefix_path: counter
allowed_methods:
- GET
- POST
push:
plugin: pnp.plugins.push.simple.Echo
# Demonstrates the use of `provide_response` set to True.
# Call will return a response object to dependent push instances.
api:
port: 9999
tasks:
- name: http_call
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 5s
push:
plugin: pnp.plugins.push.http.Call
args:
url: http://localhost:9999/counter
provide_response: true
deps:
plugin: pnp.plugins.push.simple.Echo
- name: rest_server
pull:
plugin: pnp.plugins.pull.http.Server
args:
prefix_path: counter
allowed_methods:
- GET
push:
plugin: pnp.plugins.push.simple.Nop
ml.FaceR¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.ml.FaceR | push | faceR | < 0.10.0 |
Description
FaceR (short one for face recognition) tags known faces in images. Output is the image with all faces tagged whether
with the known name or an unknown_label
. Default for unknown ones is Unknown
.
Known faces can be ingested either by a directory of known faces (known_faces_dir
) or by mapping of known_faces
(dictionary: name -> [list of face files]).
The payload
passed to the push
method is expected to be a valid byte array that represents an image in memory.
Please see the example section for loading physical files into memory.
Note
This one is not pre-installed when using the docker image. Would be grateful if anyone can integrate it
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
known_faces | Dict[str, str] | yes | None | no | Mapping of a person’s name to a list of images that contain the person’s face. |
known_faces_dir | str | yes | None | no | A directory containing images with known persons (file_name -> person’s name). |
unknown_label | str | yes | Unknown | no | Tag label of unknown faces. |
lazy | bool | yes | False | no | If set to True the face encodings will be loaded when the first push is executed (lazy); otherwise the encodings are loaded when the plugin is initialized (during __init__ ). |
Note
You need to specify either known_faces
or known_faces_dir
Result
Will return a dictionary that contains the bytes of the tagged image (key tagged_image
) and metadata (no_of_faces
,
known_faces
)
{
'tagged_image': <bytes of tagged image>
'no_of_faces': 2
'known_faces': ['obama']
}
Example
tasks:
- name: faceR
pull:
plugin: pnp.plugins.pull.fs.FileSystemWatcher
args:
path: "/tmp/camera"
recursive: true
patterns: "*.jpg"
ignore_directories: true
case_sensitive: false
events: [created]
load_file: true
mode: binary
base64: false
push:
plugin: pnp.plugins.push.ml.FaceR
args:
known_faces_dir: "/tmp/faces"
unknown_label: "don't know him"
lazy: true
mqtt.Discovery¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.mqtt.Discovery | push | none | 0.13.0 |
Description
Pushes an entity to home assistant by publishing it to an mqtt broker. The entity will be enabled to be auto discovered by home assistant.
Please see the home assistant docs about mqtt auto discovery.
The mqtt topic is structured like this:
<discovery_prefix>/<component>/[<node_id>/]<object_id>/[config|state]
You may also publish attributes
besides your state. attributes
can be passed by the envelope
via runtime. Please see the examples section for further reference.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
discovery_prefix | str | no | n/a | no | The prefix for the topic. |
component | str | no | n/a | no | The component / type of the entity, e.g. sensor , light , … |
config | Dict | no | n/a | no | A dictionary of configuration items to configure the entity (e.g. icon -> mdi:soccer ) |
object_id | str | yes | None | yes | The ID of the device. This is only to allow for separate topics for each device and is not used for the entity_id. |
node_id | str | yes | None | yes | A non-interpreted structuring entity to structure the MQTT topic. |
Note
Inside the config
section you can reference some variables to make the configuration easier.
The following variables can be referenced via the dictmentor
syntax "{{var::<variable>}}"
:
- discovery_prefix
- component
- object_id
- node_id
- base_topic
- config_topic
- state_topic
- json_attributes_topic
Please see the examples section on how to use that.
Result
Returns the payload as-is for better chaining (this plugin can’t add any useful information).
Example
tasks:
- name: fitbit_steps
pull:
plugin: pnp.plugins.pull.fitbit.Current
args:
config: !env FITBIT_AUTH
instant_run: true
interval: 5m
resources:
- activities/steps
push:
- plugin: pnp.plugins.push.mqtt.Discovery
selector: "data.get('activities/steps')"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
object_id: fitbit_steps
config:
name: "{{var::object_id}}"
icon: "mdi:soccer"
name: service_probing
pull:
plugin: pnp.plugins.pull.net.PortProbe
args:
server: server # Server name or ip address, default is localhost
port: 3000 # The port to probe if somebody is listening
timeout: 5
interval: 2m # Probe the port every five seconds ...
instant_run: true # ... and run as soon as pnp starts
push:
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: 'OFF' if data.get('reachable') else 'ON'"
object_id: "service"
attributes:
friendly_name: My Service
icon: mdi:monitor-dashboard
args:
host: !env MQTT_HOST
discovery_prefix: !env MQTT_BASE_TOPIC
component: binary_sensor
config:
name: "{{var::object_id}}"
device_class: problem
mqtt.Publish¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.mqtt.Publish | push | none | < 0.10.0 |
Description
Will push the given payload
to a mqtt broker e.g. mosquitto
.
The broker is specified by host
and port
. In addition a topic
needs to be specified were the payload
will be pushed to (e.g. home/living/thermostat
).
The payload
will be pushed as it is. No transformation is applied. If you need to some transformations, use the
selector.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
host | str | no | n/a | no | The host where the mqtt broker is running. |
port | int | yes | 1883 | no | The port where the mqtt broker is listening |
topic | Dict | yes | None | yes | The topic to subscribe to. If set to None the envelope of the payload has to contain a topic key or the push will fail. If both exists the topic from the envelope will overrule the __init__ one. |
retain | bool | yes | False | no | If set to True will mark the message as retained.
See the mosquitto man page for further guidance: https://mosquitto.org/man/mqtt-7.html. |
multi | bool | yes | False | no | If set to True the payload is expected to be a dictionary. Each item of that dictionary will be send individually to the broker. The key of the item will be appended to the configured topic. |
Result
For chaining of pushes the payload is simply returned as it is.
Example
# Demonstrates the basic mqtt.Publish
tasks:
- name: mqtt
pull:
plugin: pnp.plugins.pull.simple.Count
push:
# Will push the counter to the 'home/counter/state' topic
plugin: pnp.plugins.push.mqtt.Publish
args:
host: localhost
topic: home/counter/state
port: 1883
retain: true
# Demonstrates the topic override via envelope
tasks:
- name: mqtt
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
push:
plugin: pnp.plugins.push.mqtt.Publish
# Lets override the topic via envelope mechanism
# Will publish even counts on topic 'even' and uneven counts on 'uneven'
selector:
data: "lambda data: data"
topic: "lambda data: 'test/even' if int(data) % 2 == 0 else 'test/uneven'"
args:
host: localhost
port: 1883
# Demonstrates the use of multi push
tasks:
- name: mqtt
pull:
# Periodically gets metrics about your system
plugin: pnp.plugins.pull.monitor.Stats
args:
instant_run: true
interval: 10s
push:
# Push them to the mqtt
plugin: pnp.plugins.push.mqtt.Publish
args:
host: localhost
topic: devices/localhost/
port: 1883
retain: true
# Each item of the payload-dict (cpu_count, cpu_usage, ...) will be pushed to the broker as multiple items.
# The key of the item will be appended to the topic, e.g. `devices/localhost/cpu_count`.
# The value of the item is the actual payload.
multi: true
notify.Slack¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.notify.Slack | push | none | 0.20.0 |
Description
Sends a message to a given Slack channel.
You can specify the channel, the name of the poster, the icon of the poster and a list of users to ping.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
api_key | str | no | n/a | no | The api key of your slack oauth token. |
channel | str | no | n/a | yes | The channel to post the message to. |
username | str | yes | PnP | yes | The username of the message poster. |
emoji | str | yes | :robot: | yes | The emoji of the message poster. |
ping_users | List[str] | yes | None | yes | A list of users to ping when the message is posted. By default no one is ping’d. |
Result
Will return the payload as it is for easy chaining of dependencies.
Example
tasks:
- name: slack
pull:
plugin: pnp.plugins.pull.simple.Count # Let's count
args:
wait: 10
push:
- plugin: pnp.plugins.push.notify.Slack
selector:
data: "lambda data: 'This is the counter: {}'.format(data)"
# You can override the channel if necessary
# channel: "lambda data: 'test_even' if int(data) % 2 == 0 else 'test_odd'"
# You can override the username if necessary
# username: the_new_user
# You can override the emoji if necessary
# emoji: ':see_no_evil:'
# You can override the ping users if necessary
# ping_users:
# - clone_dede
args:
api_key: !env SLACK_API_KEY # Your slack api key.
channel: test # The channel to post to. Mandatory. Overridable by envelope.
username: slack_tester # The username to show. Default is PnP. Overridable by envelope
emoji: ':pig:' # The emoji to use. Default is :robot: . Overridable by envelope
ping_users: # The users you want to ping when the message is send. Overridable by envelope
- dede
simple.Echo¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.simple.Echo | push | none | < 0.10.0 |
Description
Simply log the passed payload to the default logging instance.
Result
Will return the payload as it is for easy chaining of dependencies.
Example
tasks:
- name: echo
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
from_cnt: 1
to_cnt: 10
push:
plugin: pnp.plugins.push.simple.Echo
simple.Execute¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.simple.Execute | push | none | 0.12.0 |
Description
Executes a command with given arguments in a shell of the operating system.
Both command
and args
may include placeholders (e.g. {{placeholder}}
) which are injected at runtime
by passing the specified payload after selector transformation. Please see the examples section for further details.
Will return the exit code of the command and optionally the output from stdout and stderr.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
command | str | no | n/a | no | The command to execute. May contain placeholders. |
args | List[str] | yes | [] | no | The arguments to pass to the command. Default is no arguments. May contain placeholders. |
cwd | str | yes | special | no | Specifies where to execute the command (working directory). Default is the folder where the invoked pnp configuration file is located. |
timeout | str|float | yes | 5s | no | Specifies how long the worker should wait for the command to finish. |
capture | bool | yes | False | no | If True stdout and stderr output is captured, otherwise not. |
Result
Returns a dictionary that contains the return_code
and optionally the output from stdout
and stderr
whether
capture
is set or not. The output is a list of lines.
{
"return_code": 0
"stdout": ["hello", "dude!"]
"stderr": []
}
Example
tasks:
- name: execute
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
from_cnt: 1
push:
plugin: pnp.plugins.push.simple.Execute
selector:
command: echo
count: "lambda data: str(data)"
labels:
prefix: "The actual count is"
iter: iterations
args:
command: "{{command}}" # The command to execute (passed by selector)
args:
- "{{labels.prefix}}"
- "{{count}}" # The named argument passed at runtime by selector
- "{{labels.iter}}"
timeout: 2s
cwd: # None -> pnp-configuration directory
capture: true # Capture stdout and stderr
deps:
- plugin: pnp.plugins.push.simple.Echo
# How to escape " correctly
tasks:
- name: execute
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
from_cnt: 1
push:
plugin: pnp.plugins.push.simple.Execute
selector:
command: echo
salutation: "\"hello you\""
args:
command: "{{command}}" # The command to execute (passed by selector)
args:
- "{{salutation}}"
timeout: 2s
cwd: # None -> pnp-configuration directory
capture: true # Capture stdout and stderr
deps:
- plugin: pnp.plugins.push.simple.Echo
# Capturing multiline output
tasks:
- name: execute
pull:
plugin: pnp.plugins.pull.simple.Count
args:
interval: 1s
from_cnt: 1
push:
plugin: pnp.plugins.push.simple.Execute
selector:
command: cat multiline.txt
args:
command: "{{command}}" # The command to execute (passed by selector)
cwd: # None -> pnp-configuration directory
capture: true # Capture stdout and stderr
deps:
- plugin: pnp.plugins.push.simple.Echo
simple.Nop¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.simple.Nop | push | none | < 0.10.0 |
Description
Executes no operation at all. A call to push(…) just returns the payload. This push is useful when you only need the power of the selector for dependent pushes.
See the example section for an example.
Nop = No operation OR No push ;-)
Result
Will return the payload as it is for easy chaining of dependencies.
Example
tasks:
- name: nop
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
interval: 1s
repeat:
- 1
- 2
- 3
push:
plugin: pnp.plugins.push.simple.Nop
selector: "data + [4]"
deps:
plugin: pnp.plugins.push.simple.Echo
unwrap: true
simple.Wait¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.simple.Wait | push | none | 0.19.0 |
Description
Performs a sleep operation and waits for some time to pass by.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
wait_for | float|str | no | n/a | no | The time to wait for before proceeding.
You can pass literals such as 5s , 1m ; ints such as 1 , 2 , 3 or floats such as 0.5 .
In the end everything will be converted to it’s float representation (1 => 1.0; 5s => 5.0; 1m => 60.0; 0.5 => 0.5 ) |
Result
Will return the payload as it is for easy chaining of dependencies.
Example
tasks:
- name: wait
pull:
plugin: pnp.plugins.pull.simple.Count # Let's count
args:
interval: 1s
push:
- plugin: pnp.plugins.push.simple.Echo
selector: "'START WAITING: {}'.format(payload)"
deps:
- plugin: pnp.plugins.push.simple.Wait
args:
wait_for: 3s
deps:
- plugin: pnp.plugins.push.simple.Echo
selector: "'END WAITING: {}'.format(payload)"
storage.Dropbox¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.storage.Dropbox | push | dropbox | 0.12.0 |
Description
Uploads a provided file to the configured dropbox account.
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
api_key | str | no | n/a | no | The api key to your dropbox account/app. |
target_file_name | str | yes | None | yes | The file path on the server where to upload the file to. If not specified you have to specify this argument during runtime by setting it in the envelope. |
created_shared_link | bool | yes | True | no | If set to True , the push will create a publicly available link to your uploaded file. |
Result
Returns a dictionary that contains metadata information about your uploaded file. If you uploaded a file named 42.txt
,
your result will be similar to the one below:
{
"name": "42.txt",
"id": "HkdashdasdOOOOOadss",
"content_hash": "aljdhfjdahfafuhu489",
"size": 42,
"path": "/42.txt",
"shared_link": "http://someserver/tosomestuff/asdasd?dl=1",
"raw_link": "http://someserver/tosomestuff/asdasd?raw=1"
}
shared_link
is the one that is publicly available (but only if you know the link).
Same for raw_link
, but this link will return the raw file (without the dropbox overhead).
Both are None
if create_shared_link
is set to False
.
Example
tasks:
- name: dropbox
pull:
plugin: pnp.plugins.pull.fs.FileSystemWatcher
args:
path: "/tmp"
ignore_directories: true
events:
- created
- modified
load_file: false
push:
- plugin: pnp.plugins.push.storage.Dropbox
args:
api_key: !env DROPBOX_API_KEY
create_shared_link: true # Create a publicly available link
selector:
data: "lambda data: data.source" # Absolute path to file
target_file_name: "lambda data: basename(data.source)" # File name only
timedb.InfluxPush¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.push.timedb.InfluxPush | push | none | < 0.10.0 |
Description
Pushes the given payload
to an influx database using the line protocol
.
You have to specify host
, port
, user
, password
and the database
.
The protocol
is basically a string that will be augmented at push-time with data from the payload.
E.g. {payload.metric},room={payload.location} value={payload.value}
assumes that payload contains metric
, location
and value
.
See also
Arguments
name | type | opt. | default | env | description |
---|---|---|---|---|---|
host | str | no | n/a | no | The host where influx service is running. |
port | int | no | n/a | no | The port where the influx service is listening on. |
user | str | no | n/a | no | Username to use for authentication. |
password | str | no | n/a | no | Related password. |
database | str | no | n/a | no | The database to store the measurement. |
protocol | str | no | n/a | no | Line protocol template (augmented with payload-data). |
Result
For the ability to chain multiple pushes together the payload is simply returned as is.
Example
tasks:
- name: influx_push
pull:
plugin: pnp.plugins.pull.mqtt.Subscribe
args:
host: mqtt
topic: home/#
push:
plugin: pnp.plugins.push.timedb.InfluxPush
selector:
data: "lambda data: data"
args:
host: influxdb
port: 8086
user: root
password: secret
database: home
# This assumes that your topics are structured like this:
# home/<room e.g. living>/<sensor e.g. humidity>
protocol: "{payload.levels[2]},room={payload.levels[1]} value={payload.payload}"
UDFs¶
New in version 0.14.0.
All udfs do share the following base arguments:
name | type | opt. | default | description |
---|---|---|---|---|
throttle | str/float | yes | None | If set to a valid duration literal (e.g. 5m ) the return value of the called functions will be cached for the given amount of time. |
Note
Please note that even when an udf does not require arguments, you anyway have to specify the args:
section.
Otherwise it will be interpreted as a regular function and not as a UDF.
...
udfs:
- name: fsize
args:
tasks:
...
hass.State¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.udf.hass.State | udf | none | 0.14.0 |
Description
Fetches the state of an entity from home assistant by a rest-api call.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
url | str | no | n/a | The url to your home assistant instance (e.g. http://hass:8123 ) |
token | str | no | n/a | The long lived access token to get access to home assistant |
timeout | float | yes | 5.0 | Tell the request to abort the waiting for a response after given number of seconds |
Note
Create a long lived access token: Home Assistant documentation
Call Arguments
name | type | opt. | default | description |
---|---|---|---|---|
entity_id | str | no | n/a | The entity to fetch the state |
attribute | str | yes | None | Optionally you can fetch the state of one of the entity attributes. Not passed will fetch the state of the entity |
Result
Returns the current state of the entity or one of it’s attributes. If the entity is not known to home assistant an exception is raised.
In case of an attribute does not exists, None
will be returned instead to signal it’s absence.
Example
udfs:
# Defines the udf. name is the actual alias you can call in selector expressions.
- name: hass_state
plugin: pnp.plugins.udf.hass.State
args:
url: http://localhost:8123
token: !env HA_TOKEN
tasks:
- name: hass_state
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
repeat: "Hello World" # Repeats 'Hello World'
interval: 1s # Every second
push:
- plugin: pnp.plugins.push.simple.Echo
# Will only print the data when attribute azimuth of the sun component is above 200
selector: "'azimuth is greater than 200' if hass_state('sun.sun', attribute='azimuth') > 200.0 else SUPPRESS"
- plugin: pnp.plugins.push.simple.Echo
# Will only print the data when the state of the sun component is above 'above_horizon'
selector: "'above_horizon' if hass_state('sun.sun') == 'above_horizon' else SUPPRESS"
simple.Counter¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.udf.simple.Counter | udf | none | 0.14.0 |
Description
Memories a counter value which is increased everytime you call the udf.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
init | int | yes | 0 | The initialization value of the counter. |
Result
Returns the current counter.
Example
udfs:
# Defines the udf. name is the actual alias you can call in selector expressions.
- name: counter
plugin: pnp.plugins.udf.simple.Counter
args:
tasks:
- name: countme
pull:
plugin: pnp.plugins.pull.simple.Repeat
args:
repeat: "Hello World" # Repeats 'Hello World'
interval: 1s # Every second
push:
- plugin: pnp.plugins.push.simple.Echo
selector:
data: "lambda data: data"
count: "lambda data: counter()" # Calls the udf
simple.FormatSize¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.udf.simple.FormatSize | udf | none | 0.14.0 |
Description
Returns the size of a file (or whatever) as a human readable size (e.g. bytes, KB, MB, GB, TB, PB). The input is expected to be at byte scale.
Call Arguments
name | type | opt. | default | description |
---|---|---|---|---|
size_in_bytes | int|float | no | n/a | The size in bytes to format to a human readable format. |
Result
Returns the argument in a human readable size format.
Example
udfs:
# Defines the udf. name is the actual alias you can call in selector expressions.
- name: fsize
plugin: pnp.plugins.udf.simple.FormatSize
args:
tasks:
- name: format_size
pull:
plugin: pnp.plugins.pull.fs.Size
args:
instant_run: true
interval: 5s
fail_on_error: false
paths:
logs: /var/log # directory - recursively determines size
copy: /bin/cp # file
push:
plugin: pnp.plugins.push.simple.Nop
selector: "[(k ,v) for k, v in data.items()]" # Transform the dictionary into a list
deps:
plugin: pnp.plugins.push.simple.Echo
unwrap: true # Call the push for each individual item in the list
selector:
object: "lambda d: d[0]"
data: "lambda d: fsize(d[1])"
simple.Memory¶
plugin | type | extra | version |
---|---|---|---|
pnp.plugins.udf.simple.Memory | udf | none | 0.14.0 |
Description
Returns a previously memorized value when called.
Arguments
name | type | opt. | default | description |
---|---|---|---|---|
init | Any | yes | None | The initial memory of the plugin. When not set initially the first call will return the value of new_memory , if specified; otherwise None . |
Call Arguments
name | type | opt. | default | description |
---|---|---|---|---|
new_memory | Any | no | None | After emitting the current memorized value the current memory is overwritten by this value. Will only be overwritten if the parameter is specified. |
Result
Returns the memorized value.
Example
udfs:
- name: mem
plugin: pnp.plugins.udf.simple.Memory
args:
tasks:
- name: countme
pull:
plugin: pnp.plugins.pull.simple.Count
args:
from_cnt: 1
interval: 1s # Every second
push:
- plugin: pnp.plugins.push.simple.Echo
# Will memorize every uneven count
selector: "mem() if data % 2 == 0 else mem(new_memory=data)"
udfs:
- name: mem
plugin: pnp.plugins.udf.simple.Memory
args:
tasks:
- name: countme
pull:
plugin: pnp.plugins.pull.simple.Count
args:
from_cnt: 1
interval: 1s # Every second
push:
- plugin: pnp.plugins.push.simple.Echo
# Will memorize every uneven count
selector: "mem() if data % 2 == 0 else mem(new_memory=data)"
Appendix¶
Fitbit Authentication¶
To request data from the fitbit account it is necessary to create an app. Go to dev.fitbit.com.
Under Manage
go to Register an App
.
For the application website and organization website, name it anything starting with http://
or https://
.
Secondly, make sure the OAuth 2.0 Application Type is Personal
.
Lastly, make sure the Callback URL is http://127.0.0.1:8080/
in order to get our Fitbit API to connect properly.
After that, click on the agreement box and submit. You will be redirected to a page that contains your Client ID
and
your Client Secret
.
Next we need to acquire your initial access
- and refresh
-token.
git clone https://github.com/orcasgit/python-fitbit.git
cd python-fitbit
python3 -m venv venv
source venv/bin/activate
pip install -r dev.txt
./gather_keys_oauth2.py <client_id> <client_secret>
You will be redirected to your browser and asked to login to your fitbit account. Next you can restrict the app to
certain data. If everything is fine, your console window should print your access
- and refresh
-token and also
expires_at
.
Put your client_id
, client_secret
, access_token
, refresh_token
and expires_at
to a yaml file and use this
file-path as the config
argument of this plugin. Please see the example below:
access_token: <access_token>
client_id: <client_id>
client_secret: <client_secret>
expires_at: <expires_at>
refresh_token: <refresh_token>
That’s it. If your token expires it will be refreshed automatically by the plugin.
Docker¶
Prelude¶
Each new version of pnp will also provide a new docker image.
# Run your container interactive and delete it after use
docker run -it --rm -v <path/to/config.yaml>:/config/config.yaml hazard/pnp:latest
# -d will run the container as a daemon in the background
docker run -d -v <path/to/config.yaml>:/config/config.yaml hazard/pnp:latest
The image tag latest
points to the latest release. You are encouraged to point to a specific tag.
Use a specific tag¶
To use a specific docker image tag you need to replace latest by some valid tag. By default every version gets its own tag.
For example if you want to use pnp version 0.21.0
docker run -it --rm -v <path/to/config.yaml>:/config/config.yaml hazard/pnp:0.21.0
You can also checkout the Dockerhub for available image tags. You will also find arm images to run pnp on a raspberry (which is what I do primarily).
Mounting a config directory¶
Sometimes you need to add auxiliary files like a logging configuration, authentication tokens, dictmentor resources, …. To make this possible you can mount a directory instead of just a single configuration file.
docker run -it --rm -v <path/to/folder:/config hazard/pnp:0.21.0
If you do it this way your main configuration has to be named config.yaml
.
Examples¶
Compiles a set of example configs to inspire you what you can achieve by using pnp.
Backup to dropbox¶
# Every sunday at 1am a backup of the specified directory is created
# and stored at the dropbox service.
tasks:
- name: cron_backup
pull:
plugin: pnp.plugins.pull.simple.Cron
args:
expressions:
- "0 1 * * SUN /tmp"
push:
plugin: pnp.plugins.push.fs.Zipper
deps:
- plugin: pnp.plugins.push.storage.Dropbox
args:
api_key: !env DROPBOX_API_KEY
create_shared_link: false
selector:
data: "lambda payload: payload"
target_file_name: "lambda payload: '{}_{}'.format(str(now()), basename(payload))"
Expose internet speed to home assistant¶
# We use the api to test our speed when we want
# curl -X POST "http://localhost:80/trigger?task=speedtest" -H "accept: application/json"
api:
port: 80
tasks:
- name: speedtest
pull:
plugin: pnp.plugins.pull.net.Speedtest
args:
interval: "0 6 * * *" # Run every morning at 6 am
push:
- plugin: pnp.plugins.push.mqtt.Discovery
selector: "data.get('download_speed_mbps')"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
object_id: speedtest_download
config:
name: "{{var::object_id}}"
icon: "mdi:cloud-download-outline"
state_topic: "{{var::state_topic}}"
unit_of_measurement: "Mbps"
- plugin: pnp.plugins.push.mqtt.Discovery
selector: "data.get('upload_speed_mbps')"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
object_id: speedtest_upload
config:
name: "{{var::object_id}}"
icon: "mdi:cloud-upload-outline"
state_topic: "{{var::state_topic}}"
unit_of_measurement: "Mbps"
- plugin: pnp.plugins.push.mqtt.Discovery
selector: "data.get('ping_latency')"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
object_id: speedtest_ping
config:
name: "{{var::object_id}}"
icon: "mdi:lan-pending"
state_topic: "{{var::state_topic}}"
unit_of_measurement: "ms"
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('server').get('name')"
attributes:
isp: "lambda data: data.get('client').get('isp')"
rating: "lambda data: data.get('client').get('rating')"
host: "lambda data: data.get('server').get('host')"
result_image: "lambda data: data.get('result_image')"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
object_id: speedtest_host
config:
name: "{{var::object_id}}"
icon: "mdi:cloud-sync-outline"
state_topic: "{{var::state_topic}}"
json_attributes_topic: "{{var::json_attributes_topic}}"
Fitbit to home assistant¶
# Polls your fitbit account for the step count and devices every 5 minutes
# and publishes those metrics to home assistant via mqtt discovery.
# https://www.home-assistant.io/docs/mqtt/discovery/
#
# Please point your environment variable `FITBIT_AUTH` to your authentication
# configuration file.
- name: fitbit_steps
pull:
plugin: pnp.plugins.pull.fitbit.Current
args:
config: !env FITBIT_AUTH
instant_run: true # Run as soon as pnp starts
interval: 5m
resources:
- activities/steps
push:
# Adds a sensor.fitbit_steps to home assistant
- plugin: pnp.plugins.push.mqtt.Discovery
selector: "data.get('activities/steps')"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
object_id: fitbit_steps
config:
name: "{{var::object_id}}"
icon: "mdi:soccer"
- name: fitbit_devices_battery
pull:
plugin: pnp.plugins.pull.fitbit.Devices
args:
config: !env FITBIT_AUTH
instant_run: true
interval: 5m
push:
# Adds sensor.fb_<device>_battery for each device attached to your fitbit account
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('battery_level')"
object_id: "lambda data: 'fb_{}_battery'.format(data.get('device_version', '').replace(' ', '_').lower())"
unwrap: true
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
device_class: "battery"
unit_of_measurement: "%"
# Adds sensor.fb_<device>_lastsync for each device attached to your fitbit account
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('last_sync_time')"
object_id: "lambda data: 'fb_{}_lastsync'.format(data.get('device_version', '').replace(' ', '_').lower())"
unwrap: true
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
Image face recognition¶
# Watches the directory '/tmp/camera' for file changes on image files and
# publishes them to a message queue (base64 encoded).
#
# Then the image data is pulled from the queue and a face recognition
# is executed. The result is a tagged file (known and unknown persons) which
# will be dumped to the specified directory.
#
- name: image_watcher
pull:
plugin: pnp.plugins.pull.fs.FileSystemWatcher
args:
path: "/tmp/camera"
events: [created]
ignore_directories: true
patterns: ['*.jpeg', '*.jpg', '*.png']
load_file: true
base64: true
push:
plugin: pnp.plugins.push.mqtt.Publish
selector: payload.file.content
args:
host: localhost
topic: camera/images
- name: image_processor
pull:
plugin: pnp.plugins.pull.mqtt.Subscribe
args:
host: localhost
topic: camera/images
push:
plugin: pnp.plugins.push.ml.FaceR
selector: b64decode(payload.payload)
args:
known_faces_dir: '/tmp/faces'
lazy: true
deps:
- plugin: pnp.plugins.push.simple.Echo
selector: (payload['known_faces'], payload['no_of_faces'])
- plugin: pnp.plugins.push.fs.FileDump
selector: payload['tagged_image']
args:
directory: '.'
extension: '.png'
Miflora to home assistant¶
# Polls a miflora device at :20 and publishes its reading
# to home assistant via mqtt discovery.
# https://www.home-assistant.io/docs/mqtt/discovery/
- name: miflora
pull:
plugin: pnp.plugins.pull.sensor.MiFlora
args:
mac: 'C4:7C:8D:67:50:AB' # The mac of your miflora device
instant_run: false
interval: '20 * * * *'
push:
- plugin: pnp.plugins.push.simple.Nop
selector: "data if data.get('conductivity') else SUPPRESS"
deps:
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('conductivity')"
object_id: "miflora_conductivity"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
unit_of_measurement: "µS/cm"
icon: mdi:flash-circle
friendly_name: Conductivity
- plugin: pnp.plugins.push.simple.Nop
selector: "data if data.get('light') else SUPPRESS"
deps:
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('light')"
object_id: "miflora_light_intensity"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
unit_of_measurement: "lx"
icon: mdi:white-balance-sunny
friendly_name: Light intensity
- plugin: pnp.plugins.push.simple.Nop
selector: "data if data.get('temperature') else SUPPRESS"
deps:
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('temperature')"
object_id: "miflora_temperature"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
unit_of_measurement: "°C"
icon: mdi:thermometer
friendly_name: Temperature
- plugin: pnp.plugins.push.simple.Nop
selector: "data if data.get('battery') else SUPPRESS"
deps:
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('battery')"
object_id: "miflora_battery"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
unit_of_measurement: "%"
device_class: battery
friendly_name: Battery
- plugin: pnp.plugins.push.simple.Nop
selector: "data if data.get('moisture') else SUPPRESS"
deps:
- plugin: pnp.plugins.push.mqtt.Discovery
selector:
data: "lambda data: data.get('moisture')"
object_id: "miflora_moisture"
args:
host: localhost
discovery_prefix: homeassistant
component: sensor
config:
name: "{{var::object_id}}"
unit_of_measurement: "%"
icon: mdi:water-percent
friendly_name: Moisture
Monitoring¶
# Client component
# Sends statistics about the host system (like cpu usage, ram usage, ...)
# to a mqtt broker every 30 seconds.
tasks:
- name: stats
pull:
plugin: pnp.plugins.pull.monitor.Stats
args:
interval: 30s
instant_run: true
push:
plugin: pnp.plugins.push.mqtt.Publish
args:
host: !env MQTT_HOST
topic: devices/my_name/stats
port: 1883
retain: true
# Each item of the payload-dict (cpu_count, cpu_usage, ...) will be pushed to the broker as multiple items.
# The key of the item will be appended to the topic, e.g. `devices/localhost/cpu_count`.
# The value of the item is the actual payload.
multi: true
# Server component
# Listens to the mqtt topic where the readings from each client are stored.
# If a new reading arrives it will be send to an influx database
# to save it for later evaluation.
tasks:
- name: stats_mqtt_pull
pull:
plugin: pnp.plugins.pull.mqtt.MQTTPull
args:
host: !env MQTT_HOST
topic: devices/+/stats/#
push:
plugin: pnp.plugins.push.timedb.InfluxPush
selector: "{'data': payload}"
args:
host: "localhost"
port: 8086
user: "the_user"
password: "the_password"
database: "my_db"
protocol: "{payload.levels[3]},device={payload.levels[1]},domain=stats value={payload.payload}"
Naive dropbox sync¶
# Every time a file is created or modified in /tmp
# the file is uploaded to dropbox and you are notified
# about it via pushbullet.
#
# You need to set your environment variables properly:
# - DROPBOX_API_KEY
# - SLACK_API_KEY
- name: dropbox_sync_notify
pull:
plugin: pnp.plugins.pull.fs.FileSystemWatcher
args:
path: "/tmp"
ignore_directories: true
events:
- created
- modified
load_file: false
push:
- plugin: pnp.plugins.push.storage.Dropbox
args:
api_key: !env DROPBOX_API_KEY
selector:
data: "lambda data: data.source" # Absolute path to file
target_file_name: "lambda data: basename(data.source)" # File name only
deps:
- plugin: pnp.plugins.push.notify.Slack
args:
api_key: !env SLACK_API_KEY # Your slack api key.
channel: test # The channel to post to. Mandatory. Overridable by envelope.
selector: data.raw_link
Changelog¶
0.28.0
- Feature: Implements yaml tags !env and !include #43
- Fix: Fixes import issue on recent scipy version #42
- Fix: Bumps dependency
speedtest-cli
to fix a bug that appears on empty server ids #44
0.27.0
- Breaking: Migrates from sanic to fastapi (some endpoints have slightly changed; see openapi docs) #34
- Breaking: Deprecates / Removes
push.notify.Pushbullet
#35 - Breaking: Deprecates / Removes
pull.zway.ZwayReceiver
#36 - Breaking: Deprecates / Removes
push.mail.GMail
#37 - Breaking: Deprecates / Removes
pull.traffic.DeutscheBahn
#38 - Breaking: Deprecates / Removes
pull.camera.MotionEyeWatcher
#39 - Breaking: Deprecates / Removes
pull.sensor.OpenWeather
#40 - Breaking (dev): Refactors the plugin interface to better distinguish between sync / async plugins to provide a simpler developer interface #41
- Breaking: Removes enable_swagger from the api configuration properties #41
- Breaking (dev): Replaces auto_str decorator by ReprMixin based on basic_repr from fastcore package #41
- Feature: Migrates from attrs to pydantic for container objects #41
- Feature: Introduces a runner to start up an application context #41
- Feature:
push.mqtt.Discovery
: Adds auto configuration of state- and json attributes topics #33 - Feature:
push.fs.Zipper
: Addsarchive_name
argument to (dynamically) control the name of the created archive #31
0.26.1
- Fix:
pull.net.PortProbe
: Catches a socket error when there is no route to the target host #32
0.26.0
- Breaking: Overhaul of the console scripts (see Console Runner) #29
- Feature: Colorful printing of configuration and logging #29
- Feature: Migrates Dockerfile from stretch to buster #27
- Fix: Proper handling of
docker stop
#30
0.25.0
- Breaking: Removes python 3.5 support
- Breaking: Updates dependencies
- Feature: Adds official python 3.8 support
- Feature: Implements pull.net.Speeedtest for testing your internet connection speed
- Fix: Fixes documentation regarding push.fs.Zipper
0.24.0
- Migrates the documentation to Read the Docs: https://pnp.readthedocs.io/en/latest/
- Implements an API
- Breaking: pull.zway.ZwayReceiver integrates it’s endpoint into the api
- Breaking: pull.http.Server integrates the endpoint into the api
- Breaking: Removes engine override from console runner (because we only have a single engine)
- Breaking: Removes argresolver support
- Breaking: Removes pull.trigger.TriggerBase and pull.trigger.Web. Tasks can be triggered via the api now
0.23.0
- Breaking: Removes engines except for AsyncEngine
- Implements pull.simple.RunOnce to run a polling component or a push chain once (nice for scripts)
- Migration from setup.py to poetry
- Migration from Makefile to python-invoke
- Introduces new configuration concept (interface: ConfigLoader)
0.22.0
- Updates docker base image to python 3.7
- Adds pull.presence.FritzBoxTracker to track known devices on a Fritz!Box
- Adds json_attributes_topic support to push.mqtt.Discovery
- Adds pull.net.SSLVerify to check ssl certificates
0.21.1
- Feature: Enables arm emulator for arm dockerfile to use docker hub autmated build
- Bugfix: Removes timeout from component push.storage.Dropbox
0.21.0
- Adds push.fs.Zipper to zip dirs and files in the process chain
0.20.2
- Bugfix: Fixes udf throttling to take arguments into account for result caching
- Refactors udf throttling / caching code to be more pythonic
- Adjusts pull.simple components to act like polling components
0.20.1
- Bugfix: Socket shutdown of pull.net.PortProbe sometimes fails in rare occasions. Is now handled properly
0.20.0
- Adds push.notify.Slack to push a message to a specified slack channel
- Adds pull.trigger.Web to externally trigger poll actions
- Breaking: Slightly changes the behaviour of udf.simple.Memory. See [docs](https://github.com/HazardDede/pnp/blob/master/docs/plugins/udf/simple.Memory/index.md)
0.19.1
- Bugfix: Adds bug workaround in schiene package used by pull.traffic.DeutscheBahn
- Bugfix: Adds exception message truncation for logging.SlackHandler to ensure starting and ending backticks (code-view)
0.19.0
- Adds pull.traffic.DeutscheBahn to poll the Deutsche Bahn website using the schiene package to find the next scheduled trains
- Adds push.simple.Wait to interrupt the execution for some specified amount of time
- Breaking: Component pull.sensor.Sound can now check multiple sound files for similarity. The configurational arguments changed. Have a look at the docs
- Breaking: Fixes ignore_overflow of pull.sensor.Sound plugin (which actually has the opposite effect)
- Breaking: pull.sensor.Sound can optionally trigger a cooldown event after the cooldown period expired. This is useful for a binary sensor to turn it off after the cooldown
- Adds slack logging handler to log messages to a slack channel and optionally ping users
- Adds pull.net.PortProbe plugin to probe a specific port if it’s being used
0.18.0
- Integrates an asyncio featured/powered engine. I think this will be the default in the future. Stay tuned!
0.17.1
- Fixes missing typing-extensions dependency
- Fixes urllib3 versions due to requests incompatibilities
0.17.0
- Adjusts inline documentation - refers to github documentation
- Refactors a majority of codebase to comply to pylint linter
- Integrates yamllint as linter
- Refactores RetryDirective (namedtuple to attr class)
- Adds decorators for parsing the envelope in a push context
- Breaking: Removes push.simple.Execute and replace it by push.simple.TemplatedExecute
- Adjusts method logger in plugin classes to automatically prepend plugin name
- Integrates coveralls
- Adds pull.ftp.Server plugin
- Adds lazy configuration property to push.ml.FaceR (basically to test initialization of FaceR without installing face-recognition and dlib)
- Adds pull.fs.Size plugin
- Adds typing for most of the core codebase and adds mypy as linter
0.16.0
- Adds ignore_overflow argument to pull.sensor.Sound to ignore buffer overflows errors on slow devices
- Possible Breaking: Adds raspberrypi specific stats (under voltage, throttle, …) to pull.monitor.stats
- Professionalizes docker image build process / Testing the container
- Documentation cosmetics
- Adds cron-like pull pull.simple.Cron
- Adds pull.camera.MotionEyeWatcher to watch a MotionEye directory to emit events
- Adds push.hass.Service to call home assistant services by rest-api
- Breaking: New default value of cwd argument of push.simple.Execute is now the folder where the invoked pnp-configuration is located and not the current working directory anymore
- Adds push.simple.TemplatedExecute as a replacement for push.simple.Execute
- Adds cron-expressions to polling base class
- Adds pull.sensor.MiFlora plugin to periodically poll xiaomi miflora devices
0.15.0
- Adds push.mail.GMail to send e-mails via the gmail api
- Adds throttle-feature to user defined functions via base class
- Adds pull.sensor.Sound to listen to the microphone’s sound stream for occurrence of a specified sound
0.14.0
- Adds UDF (user defined functions)
- Adds UDF udf.hass.State to request the current state of an entity (or one of it’s attributes) from home assistant
- Makes selector expressions in complex structures (dicts / lists) more explicit using lambda expressions with mandatory payload argument. This will probably break configs that use complex expressions containing lists and/or dictionaries
- Adds pull.hass.State to listen to state changes in home assistant
- Fixes bug in pull.fitbit.Goal when fetching weekly goals (so far daily goals were fetched too)
- Adds UDF udf.simple.Memory to memorize values to access them later
0.13.0
- Adds pull.fitbit.Current, pull.fitbit.Devices, pull.fitbit.Goal plugins to request data from fitbit api
- Adds push.mqtt.Discovery to create mqtt discovery enabled devices for home assistant. [Reference](https://www.home-assistant.io/docs/mqtt/discovery/)
- Adds unwrapping-feature to pushes
0.12.0
- Adds additional argument multi (default False) to push.mqtt.MQTTPush to send multiple messages to the broker if the payload is a dictionary (see plugin docs for reference)
- Adds plugin pull.monitor.Stats to periodically emit stats about the host system
- Adds plugin push.notify.Pushbullet to send message via the pushbullet service
- Adds plugin push.storage.Dropbox to upload files to a dropbox account/app
- Adds feature to use complex lists and/or dictionary constructs in selector expressions
- Adds plugin pull.gpio.Watcher (extra gpio) to watch gpio pins for state changes. Only works on raspberry
- Adds plugin push.simple.Execute to run commands in a shell
- Adds extra http-server to optionally install flask and gevent when needed
- Adds utility method to check for installed extras
- Adds -v | –verbose flag to pnp runner to switch logging level to DEBUG. No matter what…
0.11.3
- Adds auto-mapping magic to the pull.zway.ZwayReceiver.
- Adds humidity and temperature offset to dht
0.11.2
- Fixes error catching of run_pending in Polling base class
0.11.1
- Fixes resolution of logging configuration on startup
0.11.0
- Introduces the pull.zway.ZwayReceiver and pull.sensor.OpenWeather component
- Introduces logging configurations. Integrates dictmentor package to augment configuration
0.10.0
- Introduces engines. You are not enforced to explicitly use one and backward compatibility with legacy configs is given (actually the example configs work as they did before the change). So there shouldn’t be any Breaking change.