HTTP
HTTP Source Connector
配置
使用 HTTP source connector 前,需要进行 server 的配置。
- 请在
/resource/server-config.yml
中配置sourceEnable
为true
以开启 source 功能。 - 请在
/resource/source-config.yml
中配置 source connector, 在此仅说明connectorConfig
下的配置:connectorName
, connector 的名称- (必需)
path
, 接口的路径 - (必需)
port
, 接口的端口 idleTimeout
, 空闲 TCP 连接超时时间,单位为秒。超过idleTimeout
秒没有进行数据接收或发送的连接将会发生超时并被关闭。默认为 0, 不会发生超时。
启动
- 启动 EventMesh Runtime
- 启动 eventmesh-connector-http
完成后,HTTP source connector 会作为一个 HTTP 服务器对外提供服务。
发送消息
你可以通过 HTTP 向 source connector 发送消息。
connectorConfig:
connectorName: httpSource
path: /test
port: 3755
idleTimeout: 5
上述的例子在source-config.yml
中配置了一个 URL http://localhost:3755/test
.
你可以按照 cloudevent-spec 中的规定,以binary
模式或者structured
模式发送消息。
这里是两个例子:
以binary
模式发送消息。
curl --location --request POST 'http://localhost:3755/test' \
--header 'ce-id: 1' \
--header 'ce-specversion: 1.0' \
--header 'ce-type: com.example.someevent' \
--header 'ce-source: /mycontext' \
--header 'ce-subject: test_topic' \
--header 'Content-Type: text/plain' \
--data-raw 'testdata'
以structured
模式发送消息。
curl --location --request POST 'http://localhost:3755/test' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "1",
"specversion": "1.0",
"type": "com.example.someevent",
"source": "/mycontext",
"subject":"test_topic",
"datacontenttype":"text/plain",
"data": "testdata"
}'
HTTP Sink Connector
HTTP sink connector 拥有两种模式:common 和 webhook。
首先,无论是哪种模式,都具备 sinkConnector 的基本功能,即将消息发送给目标 HTTP 服务器。当收到 HTTP 服务器的响应时,common 模式只关心响应的状态码,以判断是否发送成功,而 webook 模式除了关心是否发送成功以外,还将响应的数据(称为回调数据)进行存储,并对外提供回调数据暴露服务。两者的具体差别如下:
默认空闲 TCP 连接超时时间
common 模式的默认值为 5000ms,webhook 模式则为 15000ms。
响应结果的处理
common 模式只关心响应的状态码,而 webhook 模式还会存储回调数据,并对外提供回调数据暴露服务。
配置
使用 HTTP sink connector 前,需要进行 sink 的配置。
请在
/resource/server-config.yml
中配置sinkEnable
为true
以开启 sink 功能。请在
/resource/sink-config.yml
中配置 sink connector,在此仅说明connectorConfig
下的配置:common 模式的最简配置:
connectorConfig:
connectorName: httpSink
urls:
- http://127.0.0.1:8987/testwebhook 模式的最简配置:
connectorConfig:
connectorName: httpSink
urls:
- http://127.0.0.1:8987/test
webhookConfig:
activate: true
port: 8988
HTTP sink connector 的所有配置如下:
属性 | 类型 | 是否必填 | 说明 | 备注 |
---|---|---|---|---|
connectorName | String | 是 | connector 名称 | |
urls | List | 是 | 接收消息的 URL 列表 | HTTP 协议默认端口为 80,HTTPS 协议则为 443 合法 URL 格式为: http://127.0.0.1:8987/test http://127.0.0.1/test https://example.com:4943/test https://example.com/test |
keepAlive | Boolean | 否 | 是否使用 HTTP 持久连接 | 默认:true |
keepAliveTimeout | Integer | 否 | HTTP 持久连接超时时长 | 单位:ms,默认:60000 |
connectionTimeout | Integer | 否 | TCP 连接超时时长 | 单位:ms,默认:5000 |
idleTimeout | Integer | 否 | TCP 空闲超时时长 | 单位:ms,默认:5000(common),15000(webhook) |
maxConnectionPoolSize | Integer | 否 | 客户端的最大 HTTP 连接数 | 默认:5 |
retryConfig | Object | 重试机制的相关配置 | ||
retryConfig.maxRetries | Integer | 否 | 最大重试次数 | 不包含首次尝试,默认:2 |
retryConfig.interval | Integer | 否 | 重试间隔 | 单位:ms,默认:2000 |
retryConfig.retryOnNonSuccess | Boolean | 否 | 是否重试收到非 2xx 响应的请求 | 默认:false,仅重试网络层面的错误请求 |
webhookConfig | Object | Webhook 模式的相关配置 | ||
webhookConfig.activate | Boolean | 否 | 是否启用 Webhook 模式 | 默认:false |
webhookConfig.exportPath | String | 否 | 回调数据暴露路径 | 默认:/export 请求方式固定为 GET 请求参数有 type:peek(默认值,获取数据)/poll(获取并删除数据) pageNum:当 type 为 peek 时,必须大于 0,当 type 为 poll 时,该参数无效 pageSize:必须大于 0 |
webhookConfig.port | Integer | 是(启用时) | 回调数据暴露端口 | |
webhookConfig.serverIdleTimeout | Integer | 否 | 暴露回调数据的服务器的 TCP 空闲超时时长 | 单位:ms,默认:5000 |
webhookConfig.maxStorageSize | Integer | 否 | 存储回调数据队列的最大值 | 默认:5000 |
启动
- 启动 EventMesh Runtime
- 启动 eventmesh-connector-http
完成之后,向 EventMesh 发送消息,然后你的消息就会通过 HTTP 请求发生给目标 HTTP 服务器了。如果启用 webhook 模式,则还会同时开启一个 HTTP 服务器对外提供回调数据暴露服务。
数据格式
HTTP sink connector 发送消息的数据格式如下:
- type:由 connectorName、协议、模式所组成
- time:发送请求的时间
- uuid:请求唯一标识,在 webhook 模式中和回调数据结构的 uuid 一一对应
- eventId:事件 ID,由 type 和 offset 组成
- data:实际需要发送的数据
{
"type": "httpSink.http.webhook",
"time": "2024-05-09T22:27:02.028",
"uuid": "951d88ee-2503-4df3-a1dd-6e4a6a1c3d3b",
"eventId": "httpSink.http.webhook-0",
"data": ${ConnectRecord}
}
启用 webhook 模式时,则还会同时开启一个 HTTP 服务器对外提供回调数据暴露服务。获取的回调数据格式如下:
- pageNum:当前页数
- pageSize:每页的大小
- pageItems:获取的回调数据数组
- pageItem 的字段含义如下:
- data:从回调服务器返回的回调数据
- metadata:元数据,字段含义如下:
- url: 获取回调数据的 URL
- code:请求回调数据得到的响应状态码,如果发生的网络层面的错误(无法收到响应),则为 -1
- message:请求回调数据得到的响应信息,如果发生的网络层面的错误(无法收到响应),则为相关异常信息
- uuid:请求回调数据的唯一请求标识,和上面发送消息时所携带的 uuid 一一对应
- receivedTime:请求回调数据得到的响应时间
- retryNum:重试次数
- retriedBy:如果发送重试,则记录被重试的请求的 uuid,否则为 null
{
"pageNum": 1,
"pageSize": 10,
"pageItems": [
{
"data": "callbackData",
"metadata": {
"url": "http://127.0.0.1:8987/test",
"code": 200,
"message": "OK",
"receivedTime": "2024-05-09 22:53:21.556",
"uuid": "fba29061-1a1f-4482-9c83-43ba4e0bcf3f",
"retriedBy": null,
"retryNum": 0
}
}
]
}