如何使用memberlist库构建长尾词gossip管理集群并实现高效数据交互?

2026-04-11 06:471阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2715个文字,预计阅读时间需要11分钟。

如何使用memberlist库构建长尾词gossip管理集群并实现高效数据交互?

通过memberlist库实现gossip管理集群以及集群数据交互概述,以下为memberlist库的简单用法:

注意:使用for循环执行list.Join,原因是初始时各节点可能都没有运行,直接执行Join会出现连接拒绝。

javafor (Node node : nodes) { list.Join(node);}

通过memberlist库实现gossip管理集群以及集群数据交互 概述

memberlist库的简单用法如下,注意下面使用for循环来执行list.Join,原因是一开始各节点都没有runing,直接执行Join会出现连接拒绝的错误。

package main import ( "fmt" "github.com/hashicorp/memberlist" "time" ) func main() { /* Create the initial memberlist from a safe configuration. Please reference the godoc for other default config types. godoc.org/github.com/hashicorp/memberlist#Config */ list, err := memberlist.Create(memberlist.DefaultLocalConfig()) if err != nil { panic("Failed to create memberlist: " + err.Error()) } t := time.NewTicker(time.Second * 5) for { select { case <-t.C: // Join an existing cluster by specifying at least one known member. n, err := list.Join([]string{"192.168.80.129"}) if err != nil { fmt.Println("Failed to join cluster: " + err.Error()) continue } fmt.Println("member number is:", n) goto END } } END: for { select { case <-t.C: // Ask for members of the cluster for _, member := range list.Members() { fmt.Printf("Member: %s %s\n", member.Name, member.Addr) } } } // Continue doing whatever you need, memberlist will maintain membership // information in the background. Delegates can be used for receiving // events when members join or leave. }

memberlist的两个主要接口如下:

  1. Create:根据入参配置创建一个Memberlist,初始化阶段Memberlist仅包含本节点状态。注意此时并不会连接到其他节点,执行成功之后就可以允许其他节点加入该memberlist。

  2. Join:使用已有的Memberlist来尝试连接给定的主机,并与之同步状态,以此来加入某个cluster。执行该操作可以让其他节点了解到本节点的存在。最后返回成功建立连接的节点数以及错误信息,如果没有与任何节点建立连接,则返回错误。

    注意当join一个cluster时,至少需要指定集群中的一个已知成员,后续会通过gossip同步整个集群的成员信息。

memberlist提供的功能主要分为两块:维护成员状态(gossip)以及数据同步(boardcast、SendReliable)。下面看几个相关接口。

接口

memberlist.Create的入参要求给出相应的配置信息,DefaultLocalConfig()给出了通用的配置信息,但还需要实现相关接口来实现成员状态的同步以及用户数据的收发。注意下面有些接口是必选的,有些则可选:

type Config struct { // ... // Delegate and Events are delegates for receiving and providing // data to memberlist via callback mechanisms. For Delegate, see // the Delegate interface. For Events, see the EventDelegate interface. // // The DelegateProtocolMin/Max are used to guarantee protocol-compatibility // for any custom messages that the delegate might do (broadcasts, // local/remote state, etc.). If you don't set these, then the protocol // versions will just be zero, and version compliance won't be done. Delegate Delegate Events EventDelegate Conflict ConflictDelegate Merge MergeDelegate Ping PingDelegate Alive AliveDelegate //... }

memberlist使用如下类型的消息来同步集群状态和处理用户消息:

const ( pingMsg messageType = iota indirectPingMsg ackRespMsg suspectMsg aliveMsg deadMsg pushPullMsg compoundMsg userMsg // User mesg, not handled by us compressMsg encryptMsg nackRespMsg hasCrcMsg errMsg ) Delegate

如果要使用memberlist的gossip协议,则必须实现该接口。所有这些方法都必须是线程安全的。

type Delegate interface { // NodeMeta is used to retrieve meta-data about the current node // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. NodeMeta(limit int) []byte // NotifyMsg is called when a user-data message is received. // Care should be taken that this method does not block, since doing // so would block the entire UDP packet receive loop. Additionally, the byte // slice may be modified after the call returns, so it should be copied if needed NotifyMsg([]byte) // GetBroadcasts is called when user data messages can be broadcast. // It can return a list of buffers to send. Each buffer should assume an // overhead as provided with a limit on the total byte size allowed. // The total byte size of the resulting data to send must not exceed // the limit. Care should be taken that this method does not block, // since doing so would block the entire UDP packet receive loop. GetBroadcasts(overhead, limit int) [][]byte // LocalState is used for a TCP Push/Pull. This is sent to // the remote side in addition to the membership information. Any // data can be sent here. See MergeRemoteState as well. The `join` // boolean indicates this is for a join instead of a push/pull. LocalState(join bool) []byte // MergeRemoteState is invoked after a TCP Push/Pull. This is the // state received from the remote side and is the result of the // remote side's LocalState call. The 'join' // boolean indicates this is for a join instead of a push/pull. MergeRemoteState(buf []byte, join bool) }

主要方法如下:

  • NotifyMsg:用于接收用户消息(userMsg)。注意不能阻塞该方法,否则会阻塞整个UDP/TCP报文接收循环。此外由于数据可能在方法调用时被修改,因此应该事先拷贝数据。

    该方法用于接收通过UDP/TCP方式发送的用户消息(userMsg):

    注意UDP方式并不是立即发送的,它会随gossip周期性发送或在处理pingMsg等消息时发送从GetBroadcasts获取到的用户消息。

    //使用UDP方式将用户消息传输到给定节点,消息大小受限于memberlist的UDPBufferSize配置。没有使用gossip机制 func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error //与SendBestEffort机制相同,只不过一个指定了Node,一个指定了Node地址 func (m *Memberlist) SendToAddress(a Address, msg []byte) error //使用TCP方式将用户消息传输到给定节点,消息没有大小限制。没有使用gossip机制 func (m *Memberlist) SendReliable(to *Node, msg []byte) error

  • GetBroadcasts:用于在gossip周期性调度或处理处理pingMsg等消息时携带用户消息,因此并不是即时的。通常会把需要发送的消息通过TransmitLimitedQueue.QueueBroadcast保存起来,然后在发送时通过TransmitLimitedQueue.GetBroadcasts获取需要发送的消息。见下面TransmitLimitedQueue的描述。

  • LocalState:用于TCP Push/Pull,用于向远端发送除成员之外的信息(可以发送任意数据),用于定期同步成员状态。参数join用于表示将该方法用于join阶段,而非push/pull。

  • MergeRemoteState:TCP Push/Pull之后调用,接收到远端的状态(即远端调用LocalState的结果)。参数join用于表示将该方法用于join阶段,而非push/pull。

定期(PushPullInterval)调用pushPull来随机执行一次完整的状态交互。但由于pushPull会与其他节点同步本节点的所有状态,因此代价也比较大。

EventDelegate

仅用于接收成员的joining 和leaving通知,可以用于更新本地的成员状态信息。

type EventDelegate interface { // NotifyJoin is invoked when a node is detected to have joined. // The Node argument must not be modified. NotifyJoin(*Node) // NotifyLeave is invoked when a node is detected to have left. // The Node argument must not be modified. NotifyLeave(*Node) // NotifyUpdate is invoked when a node is detected to have // updated, usually involving the meta data. The Node argument // must not be modified. NotifyUpdate(*Node) }

ChannelEventDelegate实现了简单的EventDelegate接口:

如何使用memberlist库构建长尾词gossip管理集群并实现高效数据交互?

type ChannelEventDelegate struct { Ch chan<- NodeEvent }

ConflictDelegate

用于通知某个client在执行join时产生了命名冲突。通常是因为两个client配置了相同的名称,但使用了不同的地址。可以用于统计错误信息。

type ConflictDelegate interface { // NotifyConflict is invoked when a name conflict is detected NotifyConflict(existing, other *Node) } MergeDelegate

在集群执行merge操作时调用。NotifyMerge方法的参数peers提供了对端成员信息。可以不实现该接口。

type MergeDelegate interface { // NotifyMerge is invoked when a merge could take place. // Provides a list of the nodes known by the peer. If // the return value is non-nil, the merge is canceled. NotifyMerge(peers []*Node) error } PingDelegate

用于通知观察者完成一个ping消息(pingMsg)要花费多长时间。可以在NotifyPingComplete中(使用histogram)统计ping的执行时间。

type PingDelegate interface { // AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack AckPayload() []byte // NotifyPing is invoked when an ack for a ping is received NotifyPingComplete(other *Node, rtt time.Duration, payload []byte) } AliveDelegate

当接收到aliveMsg消息时调用的接口,可以用于添加日志和指标等信息。

type AliveDelegate interface { // NotifyAlive is invoked when a message about a live // node is received from the network. Returning a non-nil // error prevents the node from being considered a peer. NotifyAlive(peer *Node) error } Broadcast

可以随gossip将数据广播到memberlist集群。

// Broadcast is something that can be broadcasted via gossip to // the memberlist cluster. type Broadcast interface { // Invalidates checks if enqueuing the current broadcast // invalidates a previous broadcast Invalidates(b Broadcast) bool // Returns a byte form of the message Message() []byte // Finished is invoked when the message will no longer // be broadcast, either due to invalidation or to the // transmit limit being reached Finished() }

Broadcast接口通常作为TransmitLimitedQueue.QueueBroadcast的入参:

func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { q.queueBroadcast(b, 0) }

alertmanager中的实现如下:

type simpleBroadcast []byte func (b simpleBroadcast) Message() []byte { return []byte(b) } func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false } func (b simpleBroadcast) Finished() TransmitLimitedQueue

TransmitLimitedQueue主要用于处理广播消息。有两个主要的方法:QueueBroadcastGetBroadcasts,前者用于保存广播消息,后者用于在发送的时候获取需要广播的消息。随gossip周期性调度或在处理pingMsg等消息时调用GetBroadcasts方法。

// TransmitLimitedQueue is used to queue messages to broadcast to // the cluster (via gossip) but limits the number of transmits per // message. It also prioritizes messages with lower transmit counts // (hence newer messages). type TransmitLimitedQueue struct { // NumNodes returns the number of nodes in the cluster. This is // used to determine the retransmit count, which is calculated // based on the log of this. NumNodes func() int // RetransmitMult is the multiplier used to determine the maximum // number of retransmissions attempted. RetransmitMult int mu sync.Mutex tq *btree.BTree // stores *limitedBroadcast as btree.Item tm map[string]*limitedBroadcast idGen int64 } 小结

memberlist中的消息分为两种,一种是内部用于同步集群状态的消息,另一种是用户消息。

GossipInterval周期性调度的有两个方法:

  • gossip:用于同步aliveMsgdeadMsgsuspectMsg消息
  • probe:用于使用pingMsg消息探测节点状态

// GossipInterval and GossipNodes are used to configure the gossip // behavior of memberlist. // // GossipInterval is the interval between sending messages that need // to be gossiped that haven't been able to piggyback on probing messages. // If this is set to zero, non-piggyback gossip is disabled. By lowering // this value (more frequent) gossip messages are propagated across // the cluster more quickly at the expense of increased bandwidth. // // GossipNodes is the number of random nodes to send gossip messages to // per GossipInterval. Increasing this number causes the gossip messages // to propagate across the cluster more quickly at the expense of // increased bandwidth. // // GossipToTheDeadTime is the interval after which a node has died that // we will still try to gossip to it. This gives it a chance to refute. GossipInterval time.Duration GossipNodes int GossipToTheDeadTime time.Duration

用户消息又分为两种:

  • 周期性同步:
    • PushPullInterval为周期,使用Delegate.LocalStateDelegate.MergeRemoteState以TCP方式同步用户信息;
    • 使用Delegate.GetBroadcasts随gossip发送用户信息。
  • 主动发送:使用SendReliable等方法实现主动发送用户消息。
alertmanager的处理

alertmanager通过两种方式发送用户消息,即UDP方式和TCP方式。在alertmanager中,当要发送的数据大于MaxGossipPacketSize/2将采用TCP方式(SendReliable方法),否则使用UDP方式(Broadcast接口)。

func (c *Channel) Broadcast(b []byte) { b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b}) if err != nil { return } if OversizedMessage(b) { select { case c.msgc <- b: //从c.msgc 接收数据,并使用SendReliable发送 default: level.Debug(c.logger).Log("msg", "oversized gossip channel full") c.oversizeGossipMessageDroppedTotal.Inc() } } else { c.send(b) } } func OversizedMessage(b []byte) bool { return len(b) > MaxGossipPacketSize/2 } demo

这里实现了一个简单的基于gossip管理集群信息,并通过TCP给集群成员发送信息的例子。

本文共计2715个文字,预计阅读时间需要11分钟。

如何使用memberlist库构建长尾词gossip管理集群并实现高效数据交互?

通过memberlist库实现gossip管理集群以及集群数据交互概述,以下为memberlist库的简单用法:

注意:使用for循环执行list.Join,原因是初始时各节点可能都没有运行,直接执行Join会出现连接拒绝。

javafor (Node node : nodes) { list.Join(node);}

通过memberlist库实现gossip管理集群以及集群数据交互 概述

memberlist库的简单用法如下,注意下面使用for循环来执行list.Join,原因是一开始各节点都没有runing,直接执行Join会出现连接拒绝的错误。

package main import ( "fmt" "github.com/hashicorp/memberlist" "time" ) func main() { /* Create the initial memberlist from a safe configuration. Please reference the godoc for other default config types. godoc.org/github.com/hashicorp/memberlist#Config */ list, err := memberlist.Create(memberlist.DefaultLocalConfig()) if err != nil { panic("Failed to create memberlist: " + err.Error()) } t := time.NewTicker(time.Second * 5) for { select { case <-t.C: // Join an existing cluster by specifying at least one known member. n, err := list.Join([]string{"192.168.80.129"}) if err != nil { fmt.Println("Failed to join cluster: " + err.Error()) continue } fmt.Println("member number is:", n) goto END } } END: for { select { case <-t.C: // Ask for members of the cluster for _, member := range list.Members() { fmt.Printf("Member: %s %s\n", member.Name, member.Addr) } } } // Continue doing whatever you need, memberlist will maintain membership // information in the background. Delegates can be used for receiving // events when members join or leave. }

memberlist的两个主要接口如下:

  1. Create:根据入参配置创建一个Memberlist,初始化阶段Memberlist仅包含本节点状态。注意此时并不会连接到其他节点,执行成功之后就可以允许其他节点加入该memberlist。

  2. Join:使用已有的Memberlist来尝试连接给定的主机,并与之同步状态,以此来加入某个cluster。执行该操作可以让其他节点了解到本节点的存在。最后返回成功建立连接的节点数以及错误信息,如果没有与任何节点建立连接,则返回错误。

    注意当join一个cluster时,至少需要指定集群中的一个已知成员,后续会通过gossip同步整个集群的成员信息。

memberlist提供的功能主要分为两块:维护成员状态(gossip)以及数据同步(boardcast、SendReliable)。下面看几个相关接口。

接口

memberlist.Create的入参要求给出相应的配置信息,DefaultLocalConfig()给出了通用的配置信息,但还需要实现相关接口来实现成员状态的同步以及用户数据的收发。注意下面有些接口是必选的,有些则可选:

type Config struct { // ... // Delegate and Events are delegates for receiving and providing // data to memberlist via callback mechanisms. For Delegate, see // the Delegate interface. For Events, see the EventDelegate interface. // // The DelegateProtocolMin/Max are used to guarantee protocol-compatibility // for any custom messages that the delegate might do (broadcasts, // local/remote state, etc.). If you don't set these, then the protocol // versions will just be zero, and version compliance won't be done. Delegate Delegate Events EventDelegate Conflict ConflictDelegate Merge MergeDelegate Ping PingDelegate Alive AliveDelegate //... }

memberlist使用如下类型的消息来同步集群状态和处理用户消息:

const ( pingMsg messageType = iota indirectPingMsg ackRespMsg suspectMsg aliveMsg deadMsg pushPullMsg compoundMsg userMsg // User mesg, not handled by us compressMsg encryptMsg nackRespMsg hasCrcMsg errMsg ) Delegate

如果要使用memberlist的gossip协议,则必须实现该接口。所有这些方法都必须是线程安全的。

type Delegate interface { // NodeMeta is used to retrieve meta-data about the current node // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. NodeMeta(limit int) []byte // NotifyMsg is called when a user-data message is received. // Care should be taken that this method does not block, since doing // so would block the entire UDP packet receive loop. Additionally, the byte // slice may be modified after the call returns, so it should be copied if needed NotifyMsg([]byte) // GetBroadcasts is called when user data messages can be broadcast. // It can return a list of buffers to send. Each buffer should assume an // overhead as provided with a limit on the total byte size allowed. // The total byte size of the resulting data to send must not exceed // the limit. Care should be taken that this method does not block, // since doing so would block the entire UDP packet receive loop. GetBroadcasts(overhead, limit int) [][]byte // LocalState is used for a TCP Push/Pull. This is sent to // the remote side in addition to the membership information. Any // data can be sent here. See MergeRemoteState as well. The `join` // boolean indicates this is for a join instead of a push/pull. LocalState(join bool) []byte // MergeRemoteState is invoked after a TCP Push/Pull. This is the // state received from the remote side and is the result of the // remote side's LocalState call. The 'join' // boolean indicates this is for a join instead of a push/pull. MergeRemoteState(buf []byte, join bool) }

主要方法如下:

  • NotifyMsg:用于接收用户消息(userMsg)。注意不能阻塞该方法,否则会阻塞整个UDP/TCP报文接收循环。此外由于数据可能在方法调用时被修改,因此应该事先拷贝数据。

    该方法用于接收通过UDP/TCP方式发送的用户消息(userMsg):

    注意UDP方式并不是立即发送的,它会随gossip周期性发送或在处理pingMsg等消息时发送从GetBroadcasts获取到的用户消息。

    //使用UDP方式将用户消息传输到给定节点,消息大小受限于memberlist的UDPBufferSize配置。没有使用gossip机制 func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error //与SendBestEffort机制相同,只不过一个指定了Node,一个指定了Node地址 func (m *Memberlist) SendToAddress(a Address, msg []byte) error //使用TCP方式将用户消息传输到给定节点,消息没有大小限制。没有使用gossip机制 func (m *Memberlist) SendReliable(to *Node, msg []byte) error

  • GetBroadcasts:用于在gossip周期性调度或处理处理pingMsg等消息时携带用户消息,因此并不是即时的。通常会把需要发送的消息通过TransmitLimitedQueue.QueueBroadcast保存起来,然后在发送时通过TransmitLimitedQueue.GetBroadcasts获取需要发送的消息。见下面TransmitLimitedQueue的描述。

  • LocalState:用于TCP Push/Pull,用于向远端发送除成员之外的信息(可以发送任意数据),用于定期同步成员状态。参数join用于表示将该方法用于join阶段,而非push/pull。

  • MergeRemoteState:TCP Push/Pull之后调用,接收到远端的状态(即远端调用LocalState的结果)。参数join用于表示将该方法用于join阶段,而非push/pull。

定期(PushPullInterval)调用pushPull来随机执行一次完整的状态交互。但由于pushPull会与其他节点同步本节点的所有状态,因此代价也比较大。

EventDelegate

仅用于接收成员的joining 和leaving通知,可以用于更新本地的成员状态信息。

type EventDelegate interface { // NotifyJoin is invoked when a node is detected to have joined. // The Node argument must not be modified. NotifyJoin(*Node) // NotifyLeave is invoked when a node is detected to have left. // The Node argument must not be modified. NotifyLeave(*Node) // NotifyUpdate is invoked when a node is detected to have // updated, usually involving the meta data. The Node argument // must not be modified. NotifyUpdate(*Node) }

ChannelEventDelegate实现了简单的EventDelegate接口:

如何使用memberlist库构建长尾词gossip管理集群并实现高效数据交互?

type ChannelEventDelegate struct { Ch chan<- NodeEvent }

ConflictDelegate

用于通知某个client在执行join时产生了命名冲突。通常是因为两个client配置了相同的名称,但使用了不同的地址。可以用于统计错误信息。

type ConflictDelegate interface { // NotifyConflict is invoked when a name conflict is detected NotifyConflict(existing, other *Node) } MergeDelegate

在集群执行merge操作时调用。NotifyMerge方法的参数peers提供了对端成员信息。可以不实现该接口。

type MergeDelegate interface { // NotifyMerge is invoked when a merge could take place. // Provides a list of the nodes known by the peer. If // the return value is non-nil, the merge is canceled. NotifyMerge(peers []*Node) error } PingDelegate

用于通知观察者完成一个ping消息(pingMsg)要花费多长时间。可以在NotifyPingComplete中(使用histogram)统计ping的执行时间。

type PingDelegate interface { // AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack AckPayload() []byte // NotifyPing is invoked when an ack for a ping is received NotifyPingComplete(other *Node, rtt time.Duration, payload []byte) } AliveDelegate

当接收到aliveMsg消息时调用的接口,可以用于添加日志和指标等信息。

type AliveDelegate interface { // NotifyAlive is invoked when a message about a live // node is received from the network. Returning a non-nil // error prevents the node from being considered a peer. NotifyAlive(peer *Node) error } Broadcast

可以随gossip将数据广播到memberlist集群。

// Broadcast is something that can be broadcasted via gossip to // the memberlist cluster. type Broadcast interface { // Invalidates checks if enqueuing the current broadcast // invalidates a previous broadcast Invalidates(b Broadcast) bool // Returns a byte form of the message Message() []byte // Finished is invoked when the message will no longer // be broadcast, either due to invalidation or to the // transmit limit being reached Finished() }

Broadcast接口通常作为TransmitLimitedQueue.QueueBroadcast的入参:

func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { q.queueBroadcast(b, 0) }

alertmanager中的实现如下:

type simpleBroadcast []byte func (b simpleBroadcast) Message() []byte { return []byte(b) } func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false } func (b simpleBroadcast) Finished() TransmitLimitedQueue

TransmitLimitedQueue主要用于处理广播消息。有两个主要的方法:QueueBroadcastGetBroadcasts,前者用于保存广播消息,后者用于在发送的时候获取需要广播的消息。随gossip周期性调度或在处理pingMsg等消息时调用GetBroadcasts方法。

// TransmitLimitedQueue is used to queue messages to broadcast to // the cluster (via gossip) but limits the number of transmits per // message. It also prioritizes messages with lower transmit counts // (hence newer messages). type TransmitLimitedQueue struct { // NumNodes returns the number of nodes in the cluster. This is // used to determine the retransmit count, which is calculated // based on the log of this. NumNodes func() int // RetransmitMult is the multiplier used to determine the maximum // number of retransmissions attempted. RetransmitMult int mu sync.Mutex tq *btree.BTree // stores *limitedBroadcast as btree.Item tm map[string]*limitedBroadcast idGen int64 } 小结

memberlist中的消息分为两种,一种是内部用于同步集群状态的消息,另一种是用户消息。

GossipInterval周期性调度的有两个方法:

  • gossip:用于同步aliveMsgdeadMsgsuspectMsg消息
  • probe:用于使用pingMsg消息探测节点状态

// GossipInterval and GossipNodes are used to configure the gossip // behavior of memberlist. // // GossipInterval is the interval between sending messages that need // to be gossiped that haven't been able to piggyback on probing messages. // If this is set to zero, non-piggyback gossip is disabled. By lowering // this value (more frequent) gossip messages are propagated across // the cluster more quickly at the expense of increased bandwidth. // // GossipNodes is the number of random nodes to send gossip messages to // per GossipInterval. Increasing this number causes the gossip messages // to propagate across the cluster more quickly at the expense of // increased bandwidth. // // GossipToTheDeadTime is the interval after which a node has died that // we will still try to gossip to it. This gives it a chance to refute. GossipInterval time.Duration GossipNodes int GossipToTheDeadTime time.Duration

用户消息又分为两种:

  • 周期性同步:
    • PushPullInterval为周期,使用Delegate.LocalStateDelegate.MergeRemoteState以TCP方式同步用户信息;
    • 使用Delegate.GetBroadcasts随gossip发送用户信息。
  • 主动发送:使用SendReliable等方法实现主动发送用户消息。
alertmanager的处理

alertmanager通过两种方式发送用户消息,即UDP方式和TCP方式。在alertmanager中,当要发送的数据大于MaxGossipPacketSize/2将采用TCP方式(SendReliable方法),否则使用UDP方式(Broadcast接口)。

func (c *Channel) Broadcast(b []byte) { b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b}) if err != nil { return } if OversizedMessage(b) { select { case c.msgc <- b: //从c.msgc 接收数据,并使用SendReliable发送 default: level.Debug(c.logger).Log("msg", "oversized gossip channel full") c.oversizeGossipMessageDroppedTotal.Inc() } } else { c.send(b) } } func OversizedMessage(b []byte) bool { return len(b) > MaxGossipPacketSize/2 } demo

这里实现了一个简单的基于gossip管理集群信息,并通过TCP给集群成员发送信息的例子。