生產者流量控制
生產者流量控制
在 ActiveMQ Classic 4.x 中,流量控制是使用 TCP 流量控制來實作的。為了強制執行流量控制限制,會暫停受節流的消費者底層網路連線。此策略非常有效率,但如果有多個生產者和消費者共用同一個連線,則可能會導致死鎖。
從 ActiveMQ Classic 5.0 開始,我們現在可以個別對共享連線上的每個生產者進行流量控制,而無需暫停整個連線。所謂的「流量控制」是指,如果代理程式偵測到目的地的記憶體限制,或代理程式的臨時或檔案儲存限制已超出,則可以減慢訊息的流量。生產者將被封鎖直到資源可用或將收到 JMSException:此行為是可配置的,並在下面關於 <systemUsage>
的章節中描述。
值得注意的是,預設的 <systemUsage>
設定將導致生產者在達到 memoryLimit
或 <systemUsage>
限制時封鎖:這種封鎖行為有時會被誤解為「生產者卡住」,而實際上,生產者只是盡職地等待直到有空間可用。
-
同步傳送的訊息會自動使用每個生產者的流量控制;這通常適用於同步傳送的持久訊息,除非您啟用
useAsyncSend
旗標。 -
使用 非同步傳送的生產者(一般而言,非持久訊息的生產者)不會費心等待代理程式的任何確認;因此,如果超出記憶體限制,您將不會收到通知。如果您確實想知道是否超出代理程式限制,則需要配置 ProducerWindowSize 連線選項,以便即使是非同步訊息也會對每個生產者進行流量控制。
ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);
ProducerWindowSize 是生產者在等待代理程式傳回確認訊息(表示它已接受先前傳送的訊息)之前,將傳輸到代理程式的最大資料位元組數。
或者,如果您傳送的是非持久訊息(預設情況下以非同步方式傳送),並且希望在佇列或主題的記憶體限制被突破時收到通知,則可以簡單地將連線工廠設定為「alwaysSyncSend」。雖然這樣會比較慢,但它將確保您的訊息生產者立即收到記憶體問題的通知。
如果您願意,可以透過在代理程式配置中將適當目的地策略上的 producerFlowControl
旗標設定為 false,來停用特定 JMS 佇列和主題的流量控制,例如:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>
請參閱代理程式配置。
請注意,由於在 ActiveMQ Classic 5.x 中引入了新的檔案游標,非持久訊息會被轉移到臨時檔案儲存區,以減少用於非持久訊息傳遞的記憶體量。因此,您可能會發現永遠不會達到佇列的 memoryLimit,因為游標不會使用太多記憶體。如果您真的想將所有非持久訊息保留在記憶體中,並在達到限制時停止生產者,則應配置 <vmQueueCursor>
。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
上面的程式碼片段將確保所有非持久佇列訊息都保留在記憶體中,每個佇列的限制為 1Mb。
生產者流量控制如何運作
如果您傳送的是持久訊息(因此會預期收到 OpenWire 訊息的回應),則代理程式會傳送生產者一個 ProducerAck 訊息。這會通知生產者先前的傳送視窗已處理,因此現在可以傳送另一個視窗。它有點像消費者確認,但方向相反。
優點
因此,一個良好的生產者可能會在傳送更多資料之前等待生產者確認,以避免淹沒代理程式(並在發生慢速消費者時強制代理程式封鎖整個連線)。若要查看這在原始碼中如何運作,請查看 ActiveMQMessageProducer 程式碼。
儘管用戶端可以完全忽略生產者 ACK,但如果必須處理慢速消費者,代理程式應該只會停止傳輸;不過,這確實意味著它會停止整個連線。
設定用戶端例外狀況
當代理程式上沒有可用空間時,send()
作業無限期封鎖的替代方案是設定在用戶端上擲回例外狀況。透過將 sendFailIfNoSpace
屬性設定為 true
,代理程式將導致 send()
作業失敗,並擲回 javax.jms.ResourceAllocationException
,該例外狀況將會傳播到用戶端。以下是此組態的範例
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
此屬性的優點是,用戶端可以捕捉 javax.jms.ResourceAllocationException
,稍等片刻,然後重試 send()
作業,而不是無限期地暫停。
從 5.3.1 版開始,已新增 sendFailIfNoSpaceAfterTimeout
屬性。此屬性會導致 send()
作業在用戶端上失敗並擲回例外狀況,但只有在等待指定的時間量之後才會失敗。如果在配置的時間量之後,代理程式上的空間仍然未釋放,則 send()
作業才會失敗,並在用戶端擲回例外狀況。以下是範例
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
逾時是以毫秒定義的,因此上面的範例會在將 send()
作業失敗並向用戶端擲回例外狀況之前,等待三秒鐘。此屬性的優點是,它會封鎖配置的時間量,而不是立即失敗或無限期地封鎖。此屬性不僅可以改善代理程式端,還可以改善用戶端,因此它可以捕捉例外狀況、稍等片刻,然後重試 send()
作業。
從 5.16.0 版開始,可以透過目的地策略,針對每個目的地設定 sendFailIfNoSpace
和 sendFailIfNoSpaceAfterTimeout
。
停用流量控制
一個常見的要求是停用流量控制,以便訊息分派繼續進行,直到所有可用的磁碟空間都被待處理的訊息用完(無論是設定持久或非持久訊息傳遞)。若要執行此操作,請啟用 訊息游標。
系統使用量
您也可以透過 <systemUsage>
元素上的一些屬性來減慢生產者的速度。請查看以下範例
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
您可以設定 NON_PERSISTENT
訊息的記憶體限制、PERSITENT
訊息的磁碟儲存空間以及臨時訊息的總使用量限制,代理程式會在減慢生產者的速度之前使用這些限制。使用上面顯示的預設設定,代理程式將會封鎖 send()
呼叫,直到使用某些訊息,且代理程式上有可用的空間。上面顯示的是預設值,您可能需要為您的環境增加這些值。
PercentUsage
StoreUsage 和 TempUsage 都支援 percentLimit 屬性,其中限制是根據可用總量的百分比來決定的。從 5.15.x 版開始,還有一個額外的相關 total 屬性,可用於明確設定可用的總量,以便不會查詢檔案系統。這在只有磁碟分割的一部分可供代理程式使用,或基礎檔案儲存區報告可用容量 > Long.MAX_VALUE(例如:EFS)時很有用,這會導致 java.io.File#getTotalSpace 的 long 傳回值溢位。請注意,當指定 total 時,不會再次根據檔案系統驗證實際可用的資料,只會根據該絕對總量驗證儲存使用量。