Never too late to learn.

0%

MQTT协议在STM32上的移植

paho.jpg

The Eclipse Paho project provides open-source client implementations of MQTT and MQTT-SN messaging protocols aimed at new, existing, and emerging applications for the Internet of Things (IoT).

开发平台

软件环境

  • Keil uVision 5.12
  • STM32F10x_StdPeriph_Lib_V3.5.0 标准库

硬件环境

  • stm32f103系列
  • SIM800C无线通讯GSM模块

参考文档

下载MQTT 嵌入式C/C++库

本篇是MQTT在STM32设备上的移植,因此使用paho的嵌入式C库,源码地址https://github.com/eclipse/paho.mqtt.embedded-c.

移植

首先,需要将../paho.mqtt.embedded-c/MQTTPacket/中的所有文件添加到keil工程文件,然后参考../paho.mqtt.embedded-c/MQTTPacket/samples/中的例程编写。

移植的前提是保证已经建立TCP连接,因为MQTT协议实在TCP基础上的数据传输协议。本实验使用sim800c无线通讯模块先与MQTT服务器建立TCP连接(使用透传模式),再通过串口与stm32建立通讯联系。

接口函数

移植主要参考修改库中的../paho.mqtt.embedded-c/MQTTPacket/samples/transport.c例程文件
,且主要是transport_sendPacketBuffer()和transport_getdata()函数,分别用来作为硬件实现发送和接收数据包的接口函数,因此需要针对硬件平台做相应修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int32_t transport_sendPacketBuffer(uint8_t* buf, uint32_t buflen)
{
int rc = 0;

SendBuffer(UART4, buf, buflen);

return rc;
}

int32_t transport_getdata(uint8_t* buf, int32_t count)
{
int32_t n, rc = 0;

memcpy(buf, (void*)&RxBuffer4[readBufLen], count);

readBufLen += count;

return count;
}

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
uint8_t mqtt_connect(void)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 0;
uint8_t buf[200];
int buflen = sizeof(buf);
int mysock = 0;
MQTTString topicString = MQTTString_initializer;
int len = 0;

data.clientID.cstring = "yao";
data.keepAliveInterval = 20; //seconds
data.cleansession = 1;
data.username.cstring = "nncy";
data.password.cstring = "nncy";
data.MQTTVersion = 4;

len = MQTTSerialize_connect((unsigned char *)buf, buflen, &data);

// len += MQTTSerialize_disconnect((unsigned char *)(buf + len), buflen - len);

rc = transport_sendPacketBuffer(buf, len);
printf("len:%d|rc:%d\r\n", len, rc);

reset_receive_buf();

/* wait for connack */
if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
{
unsigned char sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
{
printf("Unable to connect, return code %d\n", connack_rc);
return 0;
}
else
{
printf("MQTT CONNECT!\n");
}
}
else
printf("no connack\n");
return 0;
}

订阅和发布函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void mqtt_publish(char *pPubTopic, char *pMessage)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 0;
uint8_t buf[200];
int buflen = sizeof(buf);
int mysock = 0;
MQTTString topicString = MQTTString_initializer;
char* payload = pMessage;
int payloadlen = strlen(payload);
int len = 0;

topicString.cstring = pPubTopic;
len += MQTTSerialize_publish((unsigned char *)(buf + len), buflen - len, 0, 0, 0, 0, topicString, (unsigned char *)payload, payloadlen);

// len += MQTTSerialize_disconnect((unsigned char *)(buf + len), buflen - len);

rc = transport_sendPacketBuffer(buf, len)

}

uint8_t mqtt_subscribe(char *pSubTopic)
{
uint8_t buf[200] = {0};
uint32_t buflen = sizeof(buf);
int32_t msgid = 1;
int32_t req_qos = 0;
uint32_t rc, len = 0;
MQTTString topicString = MQTTString_initializer;
topicString.cstring = pSubTopic;

len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

rc = transport_sendPacketBuffer(buf, len);

reset_receive_buf();
if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK) /* wait for suback */
{
unsigned short submsgid;
int subcount;
int granted_qos;

rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
if (granted_qos != 0)
{
printf("granted qos != 0, %d\n", granted_qos);
return 0;
}
else
{
printf("granted qos = 0\n");
return 1;
}
}
else
printf("no suback received!\r\n");
return 0;
}

接收数据包解析函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
bool MQTT_Received_Data_Parser(char *Topic, char *message)
{
uint8_t len = 0;
uint8_t buf[256];

len = uart4RxBufLen;
printf("Received data lenth:%d\r\n", len);

if (strstr(RxBuffer4, "CLOSED")){
GPRS_CIP_MODE = 0;
return false;
} else {
memcpy(buf, (void*)&RxBuffer4, len);

printf("buf:%s\r\n", buf);
printf("topicLength:%d\r\n", buf[3]);
printf("Topic: %s\r\n", buf+4);
printf("dataLength:%d\r\n", buf[1]-buf[3]-2);
printf("Message: %s\r\n", buf+4+buf[3]);


if (0xD0 == *buf) {
printf("MQTT Heart Beat \r\n");
} else {
strncpy(Topic,(void*)(buf+4),buf[3]);
printf("Topic: %s\r\n", Topic);
strncpy(message,(void*)(buf+4+buf[3]),buf[1]-buf[3]-2);
printf("Message: %s\r\n", message);

memset(buf,0,sizeof(buf));
}
}
return true;
}

blog/mqtt/mqtt.jpg:
mqtt.jpg
blog/mqtt/mqtt1.jpg:
mqtt1.jpg
blog/mqtt/mqtt2.jpg:
mqtt2.jpg
blog/mqtt/mqtt3.jpg:
mqtt3.jpg
blog/mqtt/mqtt4.jpg:
mqtt4.jpg

参考链接

Coffee? ☕