Skip to content

Commit

Permalink
[udf] Unlock JavaScript for user-defined functions
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jan 5, 2024
1 parent 772f9f5 commit 8cd99f7
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mqttwarn changelog

in progress
===========
- [udf] Unlock JavaScript for user-defined functions. Thanks, @extremeheat.


2023-10-15 0.35.0
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ RUN --mount=type=cache,id=pip,target=/root/.cache/pip \
true \
&& pip install --upgrade pip \
&& pip install --prefer-binary versioningit wheel \
&& pip install --use-pep517 --prefer-binary '/src'
&& pip install --use-pep517 --prefer-binary '/src[javascript]'

# Uninstall build prerequisites again.
RUN apt-get --yes remove --purge git && apt-get --yes autoremove
Expand Down
79 changes: 78 additions & 1 deletion docs/configure/transformation.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,68 @@ the invocation of your function, you will create an endless loop.
:::


(user-defined-functions-languages)=
### Using other languages

This section is about using languages other than Python within the mqttwarn transformation
subsystem.

#### Introduction
Imagine you are running a large cloud infrastructure for [4IR] purposes, where hardware
appliances or industrial machines submit telemetry data to concentrators or hubs, in order
to converge them to a message bus like [Kafka], where, because you favoured a [FaaS]
solution, your [serverless] function handler will consume the telemetry messages, in
order to conduct further processing, like to filter, transform, and store the data.
Chances are that you used the [Serverless Framework], built on [Node.js], so your application
infrastructure is effectively based on a bunch of [JavaScript] code.

Now, you know how the story ends. The infrastructure costs exploded, you had to fire your
engineering team, and now you are sitting in front of a mixture of infrastructure setup
recipes and actual application code, and you are not able to make any sense of that at all.
After some digging, you are able to identify the spot where the real application logic
evaluates a set of transformation rules, in order to mangle inbound data appropriately
into outbound data, written in JavaScript.

You quickly realize that the same thing could effectively be solved using [MQTT] and Python,
and operated on a single server machine, because you are doing only a few hundred requests
per second anyway. However, you don't have the time, because, well, the customer quickly
needs a fix because they added another telemetry data field waiting to be consumed and
handled appropriately. Well, and it's Kafka anyway, how would one get rid of _that_?

After a glimpse of desperation, you are reading the fine manual again, and catch up with
the fact that telemetry data _is actually ingested using MQTT!_ That is useful, indeed!
Remembering conversations about a thing called _mqttwarn_ at the Späti the other day,
and that it gained the ability to run JavaScript code recently, in order to bring cloud
computing techniques back to personal computing, you think it would be feasible to change
that architecture of your system in the blink of an eye.

There you go: You rip out the JavaScript transformation rules into a single-file version,
export its main entry point symbol, configure mqttwarn to use `functions = mycloud.js`,
and adjust its settings to use your MQTT broker endpoint at the beginning of the data
pipeline, invoke mqttwarn, and turn off Kafka. It works!

:::{note}
Rest assured we are overexaggerating a bit, and [Kafka] can only be compared to [MQTT]
if you are also willing to compare apples with oranges, but you will get the point that
we believe simpler systems are more often than not able to solve problems equally well,
if not more efficient, both at runtime, and on details of maintenance and operation.

Other than this, every kind of system migration should be conducted with better planning
than outlined in our rant above.
:::

#### JavaScript

For running user-defined functions code written in [JavaScript], mqttwarn uses the
excellent [JSPyBridge] package. For adding JavaScript support to mqttwarn, install
it using pip like `pip install --upgrade 'mqttwarn[javascript]'`, or use one of the
available [OCI images](#using-oci-image).

You can find an example implementation for a `filter` function written in JavaScript
at the [OwnTracks-to-ntfy example tutorial](#owntracks-ntfy-variants-udf).



## User-defined function examples

In this section, you can explore a few example scenarios where user-defined
Expand Down Expand Up @@ -507,6 +569,12 @@ was received on, here `owntracks/jane/phone`. The name of the section will be
the `section` argument, here `owntracks/#/phone`.
:::

:::{note}
The recipe about how to use [](#owntracks-ntfy-variants-udf) demonstrates
corresponding examples for writing a `filter` function.
:::


(decode-topic)=
### `alldata`: Decoding topic names

Expand Down Expand Up @@ -631,10 +699,19 @@ weather,topic=tasmota/temp/ds/1 temperature=19.7 1517525319000
```


[4IR]: https://en.wikipedia.org/wiki/Fourth_Industrial_Revolution
[FaaS]: https://en.wikipedia.org/wiki/Cloud_computing#Serverless_computing_or_Function-as-a-Service_(FaaS)
[geo-fence]: https://en.wikipedia.org/wiki/Geo-fence
[InfluxDB line format]: https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/
[JavaScript]: https://en.wikipedia.org/wiki/JavaScript
[Jinja2 templates]: https://jinja.palletsprojects.com/templates/
[JSPyBridge]: https://pypi.org/project/javascript/
[Kafka]: https://en.wikipedia.org/wiki/Apache_Kafka
[MQTT]: https://en.wikipedia.org/wiki/MQTT
[Node.js]: https://en.wikipedia.org/wiki/Node.js
[OwnTracks]: https://owntracks.org
[Tasmota]: https://github.com/arendst/Tasmota
[Tasmota JSON status response for a DS18B20 sensor]: https://tasmota.github.io/docs/JSON-Status-Responses/#ds18b20
[OwnTracks]: https://owntracks.org
[serverless]: https://en.wikipedia.org/wiki/Serverless_computing
[Serverless Framework]: https://github.com/serverless/serverless
[waypoint]: https://en.wikipedia.org/wiki/Waypoint
5 changes: 5 additions & 0 deletions docs/usage/pip.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ that.
pip install --upgrade mqttwarn
```

Add JavaScript support for user-defined functions.
```bash
pip install --upgrade 'mqttwarn[javascript]'
```

You can also add support for a specific service plugin.

```bash
Expand Down
35 changes: 35 additions & 0 deletions examples/owntracks-ntfy/mqttwarn-owntracks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
*
* Forward OwnTracks low-battery warnings to ntfy.
* https://mqttwarn.readthedocs.io/en/latest/examples/owntracks-battery/readme.html
*
*/

// mqttwarn filter function, returning true if the message should be ignored.
// In this case, ignore all battery level telemetry values above a certain threshold.
function owntracks_batteryfilter(topic, message) {
let ignore = true;
let data;

// Decode inbound message.
try {
data = JSON.parse(message);
} catch {
data = null;
}

// Evaluate filtering rule.
if (data && "batt" in data && data.batt !== null) {
ignore = Number.parseFloat(data.batt) > 20;
}

return ignore;
}

// Status message.
console.log("Loaded JavaScript module.");

// Export symbols.
module.exports = {
"owntracks_batteryfilter": owntracks_batteryfilter,
};
71 changes: 71 additions & 0 deletions examples/owntracks-ntfy/readme-variants.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
---
orphan: true
---

(owntracks-ntfy-variants)=

# OwnTracks-to-ntfy setup variants


## About

This section informs you about additional configuration and operation variants of the
[](#owntracks-ntfy-recipe) recipe. For example, you may want to use Docker or Podman
to run both mqttwarn and ntfy, or you may want to use another language than Python to
implement your filtering function.


## Docker and Podman

### Running mqttwarn as container
This command will run mqttwarn in a container, using the `docker` command to launch it.
Alternatively, `podman` can be used. It expects an MQTT broker to be running on `localhost`,
so it uses the `--network=host` option. The command will mount the configuration file and
the user-defined functions file correctly, and will invoke mqttwarn with the corresponding
`--config-file` option.
```shell
docker run --rm -it --network=host --volume=$PWD:/etc/mqttwarn \
ghcr.io/jpmens/mqttwarn-standard \
mqttwarn --config-file=mqttwarn-owntracks.ini
```

### Running ntfy as container
While this tutorial uses the ntfy service at ntfy.sh, it is possible to run your own
instance. For example, use Docker or Podman.
```shell
docker run --name=ntfy --rm -it --publish=5555:80 \
binwiederhier/ntfy serve --base-url="http://localhost:5555"
```
In this case, please adjust the ntfy configuration section `[config:ntfy]` to use
a different URL, and make sure to restart mqttwarn afterwards.
```ini
[config:ntfy]
targets = {'testdrive': 'http://localhost:5555/testdrive'}
```


(owntracks-ntfy-variants-udf)=

## Alternative languages for user-defined functions

### JavaScript

In order to try that on the OwnTracks-to-ntfy example, use the alternative
`mqttwarn-owntracks.js` implementation by adjusting the `functions` setting within the
`[defaults]` section of your configuration file, and restart mqttwarn.
```ini
[defaults]
functions = mqttwarn-owntracks.js
```

The JavaScript function `owntracks_batteryfilter()` implements the same rule as the
previous one, which was written in Python.

:::{literalinclude} mqttwarn-owntracks.js
:language: javascript
:::

:::{attention}
The feature to run JavaScript code is currently considered to be experimental.
Please use it responsibly.
:::
33 changes: 6 additions & 27 deletions examples/owntracks-ntfy/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,34 +96,13 @@ echo 'foobar' | mosquitto_pub -t 'owntracks/testdrive' -l

## Appendix

This section demonstrates a few alternative methods for solving different aspects of this
recipe, and also includes administrative information.

### Running mqttwarn as container
This command will run mqttwarn in a container, using the `docker` command to launch it.
Alternatively, `podman` can be used. It expects an MQTT broker to be running on `localhost`,
so it uses the `--network=host` option. The command will mount the configuration file and
the user-defined functions file correctly, and will invoke mqttwarn with the corresponding
`--config-file` option.
```shell
docker run --rm -it --network=host --volume=$PWD:/etc/mqttwarn \
ghcr.io/jpmens/mqttwarn-standard \
mqttwarn --config-file=mqttwarn-owntracks.ini
```
### Configuration and operation variants

### Running ntfy as container
While this tutorial uses the ntfy service at ntfy.sh, it is possible to run your own
instance. For example, use Docker or Podman.
```shell
docker run --name=ntfy --rm -it --publish=5555:80 \
binwiederhier/ntfy serve --base-url="http://localhost:5555"
```
In this case, please adjust the ntfy configuration section `[config:ntfy]` to use
a different URL, and make sure to restart mqttwarn afterwards.
```ini
[config:ntfy]
targets = {'testdrive': 'http://localhost:5555/testdrive'}
```
There are different variants to configure and operate this setup. For example,
you may want to use Docker or Podman to run both mqttwarn and ntfy, or you may
want to use another language than Python to implement your filtering function.
We summarized the available options on the [](#owntracks-ntfy-variants) page,
together with corresponding guidelines how to use them.

### Backlog
:::{todo}
Expand Down
48 changes: 44 additions & 4 deletions mqttwarn/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import re
import string
import threading
import types
import typing as t
from pathlib import Path
Expand Down Expand Up @@ -147,8 +148,10 @@ def load_module_from_file(path: t.Union[str, Path]) -> types.ModuleType:
loader = importlib.machinery.SourceFileLoader(fullname=name, path=str(path))
elif path.suffix == ".pyc":
loader = importlib.machinery.SourcelessFileLoader(fullname=name, path=str(path))
elif path.suffix in [".js", ".javascript"]:
return load_source_js(name, str(path))

Check warning on line 152 in mqttwarn/util.py

View check run for this annotation

Codecov / codecov/patch

mqttwarn/util.py#L152

Added line #L152 was not covered by tests
else:
raise ImportError(f"Loading file failed (only .py and .pyc): {path}")
raise ImportError(f"Loading file type failed (only .py, .pyc, .js, .javascript): {path}")
spec = importlib.util.spec_from_loader(loader.name, loader)
if spec is None:
raise ModuleNotFoundError(f"Failed loading module from file: {path}")
Expand Down Expand Up @@ -216,16 +219,17 @@ def import_symbol(name: str, parent: t.Optional[types.ModuleType] = None) -> typ
return import_symbol(remaining_names, parent=module)


def load_functions(filepath: t.Optional[str] = None) -> t.Optional[types.ModuleType]:
def load_functions(filepath: t.Optional[t.Union[str, Path]] = None) -> t.Optional[types.ModuleType]:

if not filepath:
return None

filepath = str(filepath)

if not os.path.isfile(filepath):
raise IOError("'{}' not found".format(filepath))

py_mod = load_module_from_file(filepath)
return py_mod
return load_module_from_file(filepath)


def load_function(name: str, py_mod: t.Optional[types.ModuleType]) -> t.Callable:
Expand Down Expand Up @@ -277,3 +281,39 @@ def load_file(path: t.Union[str, Path], retry_tries=None, retry_interval=0.075,
except: # pragma: nocover
pass
return reader


def imp_new_module(name):
"""
Create a new module.
The module is not entered into sys.modules.
"""
return types.ModuleType(name)

Check warning on line 293 in mqttwarn/util.py

View check run for this annotation

Codecov / codecov/patch

mqttwarn/util.py#L293

Added line #L293 was not covered by tests


def module_factory(name, variables):
"""
Create a synthetic Python module object.
Derived from:
https://www.oreilly.com/library/view/python-cookbook/0596001673/ch15s03.html
"""
module = imp_new_module(name)
module.__dict__.update(variables)
module.__file__ = "<synthesized>"
return module

Check warning on line 306 in mqttwarn/util.py

View check run for this annotation

Codecov / codecov/patch

mqttwarn/util.py#L303-L306

Added lines #L303 - L306 were not covered by tests


def load_source_js(mod_name, filepath):
"""
Load a JavaScript module, and import its exported symbols into a synthetic Python module.
"""
import javascript

Check warning on line 313 in mqttwarn/util.py

View check run for this annotation

Codecov / codecov/patch

mqttwarn/util.py#L313

Added line #L313 was not covered by tests

js_code = load_file(filepath, retry_tries=0).read().decode("utf-8")
module = {}
javascript.eval_js(js_code)
threading.Event().wait(0.01)
return module_factory(mod_name, module["exports"])

Check warning on line 319 in mqttwarn/util.py

View check run for this annotation

Codecov / codecov/patch

mqttwarn/util.py#L315-L319

Added lines #L315 - L319 were not covered by tests
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ ignore = [
extend-exclude = [
# Always.
".venv*",
"example_*",
"tests/etc/functions_bad.py",
# Temporary.
"examples",
Expand Down
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
"gspread>=2.1.1",
"oauth2client>=4.1.2",
],
"javascript": [
"javascript==1!1.0.1; python_version>='3.7'",
],
"mysql": [
"mysql",
],
Expand Down Expand Up @@ -200,6 +203,8 @@
"Operating System :: POSIX",
"Operating System :: Unix",
"Operating System :: MacOS",
"Operating System :: Microsoft :: Windows",
"Programming Language :: JavaScript",
"Programming Language :: Python",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
Expand Down Expand Up @@ -229,7 +234,7 @@
author="Jan-Piet Mens, Ben Jones, Andreas Motl",
author_email="jpmens@gmail.com",
url="https://github.com/mqtt-tools/mqttwarn",
keywords="mqtt notification plugins data acquisition push transformation engine mosquitto",
keywords="mqtt notification plugins data acquisition push transformation engine mosquitto python javascript",
packages=find_packages(),
include_package_data=True,
package_data={
Expand Down
Loading

0 comments on commit 8cd99f7

Please sign in to comment.