生產者流量控制

功能 > 訊息分派功能 > 生產者流量控制

生產者流量控制

在 ActiveMQ Classic 4.x 中,流量控制是使用 TCP 流量控制來實作的。為了強制執行流量控制限制,會暫停受節流的消費者底層網路連線。此策略非常有效率,但如果有多個生產者和消費者共用同一個連線,則可能會導致死鎖。

從 ActiveMQ Classic 5.0 開始,我們現在可以個別對共享連線上的每個生產者進行流量控制,而無需暫停整個連線。所謂的「流量控制」是指,如果代理程式偵測到目的地的記憶體限制,或代理程式的臨時或檔案儲存限制已超出,則可以減慢訊息的流量。生產者將被封鎖直到資源可用將收到 JMSException:此行為是可配置的,並在下面關於 <systemUsage> 的章節中描述。

值得注意的是,預設的 <systemUsage> 設定將導致生產者在達到 memoryLimit<systemUsage> 限制時封鎖:這種封鎖行為有時會被誤解為「生產者卡住」,而實際上,生產者只是盡職地等待直到有空間可用。

  • 同步傳送的訊息會自動使用每個生產者的流量控制;這通常適用於同步傳送的持久訊息,除非您啟用 useAsyncSend 旗標。

  • 使用 非同步傳送的生產者(一般而言,非持久訊息的生產者)不會費心等待代理程式的任何確認;因此,如果超出記憶體限制,您將不會收到通知。如果您確實想知道是否超出代理程式限制,則需要配置 ProducerWindowSize 連線選項,以便即使是非同步訊息也會對每個生產者進行流量控制。

    ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);
    

    ProducerWindowSize 是生產者在等待代理程式傳回確認訊息(表示它已接受先前傳送的訊息)之前,將傳輸到代理程式的最大資料位元組數。

或者,如果您傳送的是非持久訊息(預設情況下以非同步方式傳送),並且希望在佇列或主題的記憶體限制被突破時收到通知,則可以簡單地將連線工廠設定為「alwaysSyncSend」。雖然這樣會比較慢,但它將確保您的訊息生產者立即收到記憶體問題的通知。

如果您願意,可以透過在代理程式配置中將適當目的地策略上的 producerFlowControl 旗標設定為 false,來停用特定 JMS 佇列和主題的流量控制,例如:

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="FOO.>" producerFlowControl="false"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

請參閱代理程式配置

請注意,由於在 ActiveMQ Classic 5.x 中引入了新的檔案游標,非持久訊息會被轉移到臨時檔案儲存區,以減少用於非持久訊息傳遞的記憶體量。因此,您可能會發現永遠不會達到佇列的 memoryLimit,因為游標不會使用太多記憶體。如果您真的想將所有非持久訊息保留在記憶體中,並在達到限制時停止生產者,則應配置 <vmQueueCursor>

<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">    
  <pendingQueuePolicy>
    <vmQueueCursor/>
  </pendingQueuePolicy>
</policyEntry>

上面的程式碼片段將確保所有非持久佇列訊息都保留在記憶體中,每個佇列的限制為 1Mb。

生產者流量控制如何運作

如果您傳送的是持久訊息(因此會預期收到 OpenWire 訊息的回應),則代理程式會傳送生產者一個 ProducerAck 訊息。這會通知生產者先前的傳送視窗已處理,因此現在可以傳送另一個視窗。它有點像消費者確認,但方向相反。

優點

因此,一個良好的生產者可能會在傳送更多資料之前等待生產者確認,以避免淹沒代理程式(並在發生慢速消費者時強制代理程式封鎖整個連線)。若要查看這在原始碼中如何運作,請查看 ActiveMQMessageProducer 程式碼。

儘管用戶端可以完全忽略生產者 ACK,但如果必須處理慢速消費者,代理程式應該只會停止傳輸;不過,這確實意味著它會停止整個連線。

設定用戶端例外狀況

當代理程式上沒有可用空間時,send() 作業無限期封鎖的替代方案是設定在用戶端上擲回例外狀況。透過將 sendFailIfNoSpace 屬性設定為 true,代理程式將導致 send() 作業失敗,並擲回 javax.jms.ResourceAllocationException,該例外狀況將會傳播到用戶端。以下是此組態的範例

<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

此屬性的優點是,用戶端可以捕捉 javax.jms.ResourceAllocationException,稍等片刻,然後重試 send() 作業,而不是無限期地暫停。

從 5.3.1 版開始,已新增 sendFailIfNoSpaceAfterTimeout 屬性。此屬性會導致 send() 作業在用戶端上失敗並擲回例外狀況,但只有在等待指定的時間量之後才會失敗。如果在配置的時間量之後,代理程式上的空間仍然未釋放,則 send() 作業才會失敗,並在用戶端擲回例外狀況。以下是範例

<systemUsage>
 <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

逾時是以毫秒定義的,因此上面的範例會在將 send() 作業失敗並向用戶端擲回例外狀況之前,等待三秒鐘。此屬性的優點是,它會封鎖配置的時間量,而不是立即失敗或無限期地封鎖。此屬性不僅可以改善代理程式端,還可以改善用戶端,因此它可以捕捉例外狀況、稍等片刻,然後重試 send() 作業。

從 5.16.0 版開始,可以透過目的地策略,針對每個目的地設定 sendFailIfNoSpacesendFailIfNoSpaceAfterTimeout

停用流量控制

一個常見的要求是停用流量控制,以便訊息分派繼續進行,直到所有可用的磁碟空間都被待處理的訊息用完(無論是設定持久或非持久訊息傳遞)。若要執行此操作,請啟用 訊息游標

系統使用量

您也可以透過 <systemUsage> 元素上的一些屬性來減慢生產者的速度。請查看以下範例

<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage limit="64 mb" />
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="100 gb" />
    </storeUsage>
    <tempUsage>
      <tempUsage limit="10 gb" />
    </tempUsage>
  </systemUsage>
</systemUsage>

您可以設定 NON_PERSISTENT 訊息的記憶體限制、PERSITENT 訊息的磁碟儲存空間以及臨時訊息的總使用量限制,代理程式會在減慢生產者的速度之前使用這些限制。使用上面顯示的預設設定,代理程式將會封鎖 send() 呼叫,直到使用某些訊息,且代理程式上有可用的空間。上面顯示的是預設值,您可能需要為您的環境增加這些值。

PercentUsage

StoreUsage 和 TempUsage 都支援 percentLimit 屬性,其中限制是根據可用總量的百分比來決定的。從 5.15.x 版開始,還有一個額外的相關 total 屬性,可用於明確設定可用的總量,以便不會查詢檔案系統。這在只有磁碟分割的一部分可供代理程式使用,或基礎檔案儲存區報告可用容量 > Long.MAX_VALUE(例如:EFS)時很有用,這會導致 java.io.File#getTotalSpace 的 long 傳回值溢位。請注意,當指定 total 時,不會再次根據檔案系統驗證實際可用的資料,只會根據該絕對總量驗證儲存使用量。

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛標誌和 Apache ActiveMQ 專案標誌是 Apache 軟體基金會的商標。版權所有 © 2024,Apache 軟體基金會。在 Apache License 2.0 下授權。