pmalhaire / xk6-mqtt Goto Github PK
View Code? Open in Web Editor NEWk6 mqtt extension
License: Apache License 2.0
k6 mqtt extension
License: Apache License 2.0
Related: #21
Hi, thank you for the extension. I ran into the error in the title after building with the default xk6 build latest --with github.com/pmalhaire/xk6-mqtt
. I was able to work around by specifying a commit like xk6 build latest --with github.com/pmalhaire/xk6-mqtt@231ecd388e503ca4a59eb346e03ac8a66011e12e
. I see the latest release was created back in July but the commit that fixes the issue was made in November so perhaps it's just a matter of creating a new release.
This is because the Message argument on Publish function is declared as string.
Description
First of all, thank you for this extension! I have been using it for a little while and it has worked wonderfully. Just recently xk6 has started failing to build when I include this extension.
Steps to reproduce:
xk6 build --with github.com/pmalhaire/xk6-mqtt@latest
Example output
~/stresstesting (master) $ xk6 build --with github.com/pmalhaire/xk6-mqtt@latest
... etc.
2022/11/11 23:52:36 [INFO] exec (timeout=0s): /usr/local/opt/[email protected]/bin/go mod edit -require go.k6.io/k6@latest
2022/11/11 23:52:36 [INFO] exec (timeout=0s): /usr/local/opt/[email protected]/bin/go mod tidy
go: finding module for package github.com/pmalhaire/xk6-mqtt
go: found github.com/pmalhaire/xk6-mqtt in github.com/pmalhaire/xk6-mqtt v0.38.0
2022/11/11 23:52:37 [INFO] exec (timeout=0s): /usr/local/opt/[email protected]/bin/go mod edit -require github.com/pmalhaire/xk6-mqtt@latest
2022/11/11 23:52:37 [INFO] exec (timeout=0s): /usr/local/opt/[email protected]/bin/go mod tidy
2022/11/11 23:52:37 [INFO] Build environment ready
2022/11/11 23:52:37 [INFO] Building k6
2022/11/11 23:52:37 [INFO] exec (timeout=0s): /usr/local/opt/[email protected]/bin/go mod tidy
2022/11/11 23:52:37 [INFO] exec (timeout=0s): /usr/local/opt/[email protected]/bin/go build -o /Users/patlevy/Exosite/exosense/loadtesting/murano-stress-test/stresstesting/k6 -ldflags -w -s -trimpath
# github.com/pmalhaire/xk6-mqtt
../../../../../go/pkg/mod/github.com/pmalhaire/[email protected]/publish.go:40:18: unknown field 'Metric' in struct literal of type metrics.Sample
../../../../../go/pkg/mod/github.com/pmalhaire/[email protected]/subscribe.go:71:19: unknown field 'Metric' in struct literal of type metrics.Sample
2022/11/11 23:53:02 [INFO] Cleaning up temporary folder: /Users/patlevy/stresstesting/buildenv_2022-11-11-2352.1238861476
2022/11/11 23:53:02 [FATAL] exit status 2
Hi,
i tried to run the test example but i get the following error
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
ERRO[0000] TypeError: Value is not an object: undefined
at file:///C:/Users/esskar/test.js:42:16(88) hint="script exception"
```
Line 42 is `let publisher = new mqtt.Client(`
Have i missed something?
Hi,
is it possible to make Encrypted connection via TLS?
Thanks
Samer
k6 v0.38.0 has been released and it contains a breaking change for the stats
package. We merged it in the metrics
package. The current build of the extension could be affected.
stats
package to the relative metrics
definition. The API is not changed so it should be just a find and replace of the package.I believe, the isConnected()
function is not working properly. I think once connected, the isConnected()
function will always return True
, no matter the actual status.
What I did:
What did I expect:
isConnected()
returns false
What happened:
isConnected()
returns true
always
For QoS 1 and 2, I can work around it, using the publish()
function.
But for QoS 0 even the publish()
function doesn't throw an error. Which makes it hard to test a broker, if it actually bails out.
Following a test script, maybe I did something wrong:
import { Counter, Gauge, Rate, Trend } from 'k6/metrics';
import {
check, fail
} from 'k6';
export const options = {
discardResponseBodies: true,
scenarios: {
contacts: {
executor: 'constant-arrival-rate',
// How long the test lasts
duration: '60s',
// How many iterations per timeUnit
rate: 1,
// Start `rate` iterations per second
timeUnit: '1s',
// Pre-allocate 2 VUs before starting the test
preAllocatedVUs: 2,
// Spin up a maximum of 50 VUs to sustain the defined
// constant arrival rate.
maxVUs: 50,
},
},
};
const mqtt = require('k6/x/mqtt');
const rnd_count = 2000;
// create random number to create a new topic at each run
let rnd = Math.round(Math.random() * rnd_count);
// conection timeout (ms)
let connectTimeout = 2000
// publish timeout (ms)
let publishTimeout = 2000
// connection close timeout (ms)
let closeTimeout = 2000
// Mqtt topic one per VU
const k6Topic = `benchmark/${rnd}/${__VU}`;
// Connect IDs one connection per VU
const k6PubId = `k6-pub-${rnd}-${__VU}`;
const host = "mqtt://192.666.666.666"; // ip to reach broker
const port = "1883";
const failedMsgs = new Counter('failed_messages');
const failedConnects = new Counter('failed_connects');
const connectRate = new Rate('connect_rate');
// create publisher client
let publisher = new mqtt.Client(
// The list of URL of MQTT server to connect to
[host + ":" + port],
// A username to authenticate to the MQTT server
"",
// Password to match username
"",
// clean session setting
false,
// Client id for reader
k6PubId,
// timeout in ms
connectTimeout,
)
let err;
let connect_status;
// initial connect MUST happen, otherwise test will fail
try {
console.log(`before Connect: ${publisher.isConnected()}`)
connect_status = publisher.connect()
console.info(`init Connect response: ${connect_status}`)
}
catch (error) {
err = error
}
if (err != undefined) {
console.error("publish connect error:", err)
// you may want to use fail here if you want only to test successfull connection only
fail("fatal could not make initial connect to broker for publish")
}
export default function () {
// Message content one per ITER
const k6Message = `{"temperature":20,"timeStamp":${Math.round(new Date().getTime() / 1000)}}`;
console.log(`Current Connect status: ${publisher.isConnected()}`);
connectRate.add(
check(publisher, {
"is publisher connected": publisher => publisher.isConnected()
})
);
if (!publisher.isConnected()) {
failedConnects.add(1);
publisher.connect()
} else {
// publish count messages
let err_publish;
try {
publisher.publish(
// topic to be used
k6Topic,
// The QoS of messages
1,
// Message to be sent
k6Message,
// retain policy on message
false,
// timeout in ms
publishTimeout,
// async publish handlers if needed
(obj) => { // success
console.log(obj.type) // publish
console.log(obj.topic) // published topic
},
(err) => { // failure
console.log(err.type) // error
console.log(err.message)
}
);
} catch (error) {
failedMsgs.add(1);
err_publish = error
}
console.log(err_publish)
}
}
export function teardown() {
console.log("CLOSING SESSION")
// closing both connections at VU close
publisher.close(closeTimeout);
}
Hi, I have upgraded the k6 version to the latest stable version, here's the pull request #29. Please let me know if any other changes are required, thanks.
Hi,
i'am trying the example code from:
https://github.com/pmalhaire/xk6-mqtt/blob/main/examples/test_ssl.js
and I am getting the error "TypeError: Value is not an object: undefined" from the line 49 "let publisher = new mqtt.Client("
Thanks for your help
Samer
We've recently updated the requirements for maintaining an extension within the listing on our site. As such, please address the following items to maintain your listing within the registry:
For more information on these and other listing requirements, please take a look at the registry requirements.
Getting issue while using xk6-mqtt with influxdb:
panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x28 pc=0x9bcbc0] goroutine 97 [running]: github.com/mstoykov/atlas.(*Node).Path(0xc001174100) github.com/mstoykov/[email protected]/atlas.go:77 +0x20 go.k6.io/k6/metrics.(*TagSet).Map(...) go.k6.io/[email protected]/metrics/tags.go:74 go.k6.io/k6/output/influxdb.(*Output).batchFromSamples(0xc00230a100, {0xc001518200, 0x10, 0x11a1720}) go.k6.io/[email protected]/output/influxdb/output.go:129 +0x2fc go.k6.io/k6/output/influxdb.(*Output).flushMetrics.func1() go.k6.io/[email protected]/output/influxdb/output.go:203 +0x12a created by go.k6.io/k6/output/influxdb.(*Output).flushMetrics go.k6.io/[email protected]/output/influxdb/output.go:195 +0x12f
Code:
Scenario file:
import {Counter, Trend } from 'k6/metrics';
const mqtt = require('k6/x/mqtt');
let connectPub_count = new Counter('connectPub_count');
let publish_count = new Counter('publish_count');
let publish_trend = new Trend('publish_time', true);
.
.
.
export function connectPub(k6PubId ,vus_connections) {
let pub_client, err_pub_client
if (k6PubId in vus_connections) {
pub_client = vus_connections[k6PubId];
} else {
try {
let pub_client = mqtt.Client([MQTThost], user, password, cleanSessionSetting, k6PubId, timeout,)
pub_client.connect()
connectPub_count .add(1)
vus_connections[k6PubId] = pub_client;
} catch (error) {
err_pub_client = error;
console.log("connect error:", error)
}
}
check(err_pub_client, { "Connect to topic": err => err === undefined });
return pub_client
}
.
.
.
export function publish(pub_client, k6Message) {
let err_publish;
let startTime = new Date().getTime();
try {
pub_client.publish(k6Topic, QoS, k6Message, retainPolicy, timeout);
publish_trend.add(new Date().getTime() - startTime);
} catch (error) { err_publish = error }
publish_count.add(1)
check(err_publish, { "Sending through MQTT": err => err === undefined });
}
Main load file:
import { randomPayload } from "./common/helper.js";
import {Gauge} from 'k6/metrics';
.
.
.
let vus_connections = {};
let mqtt_count = new Gauge('mqttCount');
.
.
.
export function mqttFlow() {
let k6Message = randomPayload();
const k6PubId = pub-${__VU}
;
let pub_client = mqtt.connectPub(k6PubId, vus_connections)
vus_connections[k6PubId] = pub_client
mqtt.publish(pub_client, k6Message)
mqtt_count.add(ENV.DEVICE_COUNT_MQTT_1);
}
When I disable INFLUX everything works fine
There are some warnings regarding prometheus metric name compliance (https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels):
time="2023-09-13T17:18:01Z" level=warning msg="Metric name should only include ASCII letters, numbers and underscores. This name will stop working in k6 v0.48.0 (around December 2023)." name=mqtt.sent.messages.count
Hi everybody,
i want to write a load test where i have to connect to an mqtt server as several devices with different identities - i load this list if device identities with SharedArray from an CSV file. The file contains device id, device certificate and device key.
Current implementation of client only allows loading certificate and key from file. Handling files in k6 is quite complicated, so i would like to implement an client api to load them from a PEM string value.
Any suggestions how to add this functionality to the existing api?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.