Skip to main content
版本:Next

HTTP

HTTP Source Connector

配置

使用 HTTP source connector 前,需要进行 server 的配置。

  • 请在 /resource/server-config.yml 中配置 sourceEnabletrue 以开启 source 功能。
  • 请在 /resource/source-config.yml中配置 source connector, 在此仅说明 connectorConfig 下的配置:
    • connectorName, connector 的名称
    • (必需) path, 接口的路径
    • (必需) port, 接口的端口
    • idleTimeout, 空闲 TCP 连接超时时间,单位为秒。超过 idleTimeout 秒没有进行数据接收或发送的连接将会发生超时并被关闭。默认为 0, 不会发生超时。

启动

  1. 启动 EventMesh Runtime
  2. 启动 eventmesh-connector-http

完成后,HTTP source connector 会作为一个 HTTP 服务器对外提供服务。

发送消息

你可以通过 HTTP 向 source connector 发送消息。

connectorConfig:
connectorName: httpSource
path: /test
port: 3755
idleTimeout: 5

上述的例子在source-config.yml中配置了一个 URL http://localhost:3755/test.

你可以按照 cloudevent-spec 中的规定,以binary模式或者structured模式发送消息。

这里是两个例子:

binary模式发送消息。

curl --location --request POST 'http://localhost:3755/test' \
--header 'ce-id: 1' \
--header 'ce-specversion: 1.0' \
--header 'ce-type: com.example.someevent' \
--header 'ce-source: /mycontext' \
--header 'ce-subject: test_topic' \
--header 'Content-Type: text/plain' \
--data-raw 'testdata'

structured模式发送消息。

curl --location --request POST 'http://localhost:3755/test' \
--header 'Content-Type: application/cloudevents+json' \
--data-raw '{
"id": "1",
"specversion": "1.0",
"type": "com.example.someevent",
"source": "/mycontext",
"subject":"test_topic",
"datacontenttype":"text/plain",
"data": "testdata"
}'

HTTP Sink Connector

HTTP sink connector 拥有两种模式:common 和 webhook。

首先,无论是哪种模式,都具备 sinkConnector 的基本功能,即将消息发送给目标 HTTP 服务器。当收到 HTTP 服务器的响应时,common 模式只关心响应的状态码,以判断是否发送成功,而 webook 模式除了关心是否发送成功以外,还将响应的数据(称为回调数据)进行存储,并对外提供回调数据暴露服务。两者的具体差别如下:

  • 默认空闲 TCP 连接超时时间

    common 模式的默认值为 5000ms,webhook 模式则为 15000ms。

  • 响应结果的处理

    common 模式只关心响应的状态码,而 webhook 模式还会存储回调数据,并对外提供回调数据暴露服务。

配置

使用 HTTP sink connector 前,需要进行 sink 的配置。

  • 请在 /resource/server-config.yml 中配置 sinkEnabletrue 以开启 sink 功能。

  • 请在 /resource/sink-config.yml中配置 sink connector,在此仅说明 connectorConfig 下的配置:

    common 模式的最简配置:

    connectorConfig:
    connectorName: httpSink
    urls:
    - http://127.0.0.1:8987/test

    webhook 模式的最简配置:

    connectorConfig:
    connectorName: httpSink
    urls:
    - http://127.0.0.1:8987/test
    webhookConfig:
    activate: true
    port: 8988

HTTP sink connector 的所有配置如下:

属性类型是否必填说明备注
connectorNameStringconnector 名称
urlsList接收消息的 URL 列表HTTP 协议默认端口为 80,HTTPS 协议则为 443
合法 URL 格式为:
http://127.0.0.1:8987/test
http://127.0.0.1/test
https://example.com:4943/test
https://example.com/test
keepAliveBoolean是否使用 HTTP 持久连接默认:true
keepAliveTimeoutIntegerHTTP 持久连接超时时长单位:ms,默认:60000
connectionTimeoutIntegerTCP 连接超时时长单位:ms,默认:5000
idleTimeoutIntegerTCP 空闲超时时长单位:ms,默认:5000(common),15000(webhook)
maxConnectionPoolSizeInteger客户端的最大 HTTP 连接数默认:5
retryConfigObject重试机制的相关配置
retryConfig.maxRetriesInteger最大重试次数不包含首次尝试,默认:2
retryConfig.intervalInteger重试间隔单位:ms,默认:2000
retryConfig.retryOnNonSuccessBoolean是否重试收到非 2xx 响应的请求默认:false,仅重试网络层面的错误请求
webhookConfigObjectWebhook 模式的相关配置
webhookConfig.activateBoolean是否启用 Webhook 模式默认:false
webhookConfig.exportPathString回调数据暴露路径默认:/export
请求方式固定为 GET
请求参数有
type:peek(默认值,获取数据)/poll(获取并删除数据)
pageNum:当 type 为 peek 时,必须大于 0,当 type 为 poll 时,该参数无效
pageSize:必须大于 0
webhookConfig.portInteger是(启用时)回调数据暴露端口
webhookConfig.serverIdleTimeoutInteger暴露回调数据的服务器的 TCP 空闲超时时长单位:ms,默认:5000
webhookConfig.maxStorageSizeInteger存储回调数据队列的最大值默认:5000

启动

  1. 启动 EventMesh Runtime
  2. 启动 eventmesh-connector-http

完成之后,向 EventMesh 发送消息,然后你的消息就会通过 HTTP 请求发生给目标 HTTP 服务器了。如果启用 webhook 模式,则还会同时开启一个 HTTP 服务器对外提供回调数据暴露服务。

数据格式

HTTP sink connector 发送消息的数据格式如下:

  • type:由 connectorName、协议、模式所组成
  • time:发送请求的时间
  • uuid:请求唯一标识,在 webhook 模式中和回调数据结构的 uuid 一一对应
  • eventId:事件 ID,由 type 和 offset 组成
  • data:实际需要发送的数据
{
"type": "httpSink.http.webhook",
"time": "2024-05-09T22:27:02.028",
"uuid": "951d88ee-2503-4df3-a1dd-6e4a6a1c3d3b",
"eventId": "httpSink.http.webhook-0",
"data": ${ConnectRecord}
}

启用 webhook 模式时,则还会同时开启一个 HTTP 服务器对外提供回调数据暴露服务。获取的回调数据格式如下:

  • pageNum:当前页数
  • pageSize:每页的大小
  • pageItems:获取的回调数据数组
  • pageItem 的字段含义如下:
    • data:从回调服务器返回的回调数据
    • metadata:元数据,字段含义如下:
      • url: 获取回调数据的 URL
      • code:请求回调数据得到的响应状态码,如果发生的网络层面的错误(无法收到响应),则为 -1
      • message:请求回调数据得到的响应信息,如果发生的网络层面的错误(无法收到响应),则为相关异常信息
      • uuid:请求回调数据的唯一请求标识,和上面发送消息时所携带的 uuid 一一对应
      • receivedTime:请求回调数据得到的响应时间
      • retryNum:重试次数
      • retriedBy:如果发送重试,则记录被重试的请求的 uuid,否则为 null
{
"pageNum": 1,
"pageSize": 10,
"pageItems": [
{
"data": "callbackData",
"metadata": {
"url": "http://127.0.0.1:8987/test",
"code": 200,
"message": "OK",
"receivedTime": "2024-05-09 22:53:21.556",
"uuid": "fba29061-1a1f-4482-9c83-43ba4e0bcf3f",
"retriedBy": null,
"retryNum": 0
}
}
]
}