The code below, which creates a subscription and dumps the result into the diagnostics log, is similar to the source code of the library in question. You can run this directly with the Python Runtime app.
import requests
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
session = requests.Session()
session.verify = False
response = session.post("https://localhost/identity-manager/api/v2/auth/token", json={"name":"boschrexroth","password":"boschrexroth"}).json()
token = response['access_token']
data = {"Authorization": f"Bearer {token}"}
with session.get("https://localhost/automation/api/v2/events?nodes=framework/metrics/system/cpu-utilisation-percent&publishIntervalMs=5000", headers=data, stream=True) as stream:
for line in stream.iter_lines():
tmp = line.decode("utf-8").split(":",1)
if len(tmp) > 1 and tmp[0] == 'data':
print(tmp[1], flush=True)
Output:
... View more