13 January 2015

上個月我踩到了 Cassandra anti pattern - durable queue,結果 server 後來爆掉了,實在是很慘。後來我們變更了設計,來避開這個問題。今天來分享整件事的來龍去脈,以及目前是怎麼解決的。

Durable queue 是什麼

先來個名詞解釋:Durable queue 是什麼意思?

Queue 裡面放的是訊息,按新增的順序排序。存取的方向是 FIFO 。而 durable 的意思 queue 裡面放的訊息,如果還沒 用掉 (acknowledge),它就不會消失,即使是 server 重開了。所以 durable queue 的實作通常會儲存到 disk。

一個典型的 durable queue 是這樣的操作的:假設一開始 server 裡 queue 存有四個訊息 a,b,c,d。Client 先讀了前三筆 a,b,c 處理,然後回傳給 server 說處理好了 (acknowledge),server 就刪掉 a,b,c。接著 client 再跟 server 問,這時 server 應該要回傳剩下的 d,client 之後用完又再刪除。如此重覆下去。

為方便討論,以下文章寫到 queue 時都是指 durable queue。

Queue 的應用廣泛

Queue 其實在一個應用中很常見的,例如 "有多少封信還沒收下來" 這樣的案例。但是 Cassandra 官方卻告訴我們,它不適合處理這樣的需求。不能滿足常見的需求實在有點糟糕啊。

Cassandra 官方的說法是你該用專屬的 queue 解決方案取代,例如 RabbitMQ 或 ActiveMQ 等等。但就我所知,這類型擅長的方案大多是固定數量的 queue (最多上千個 queue 吧),通常是用在後台流程。它們無法解決用戶端應用的 queue。

回到 "有多少封信還沒收下來" 這個案例,這個應用會有多少個 queue?有多少用戶就有多少個!換句話說,就是百萬以上這個量級。

ps: 這裡提的 queue 數量不是一個 queue 內可以放多少個訊息,而是有多少個不同的 queue 。

用戶導向 Queue 的特徵

  1. Queue 數量跟用戶數一樣多,百萬、千萬以上
  2. 讀取次數比寫入多 (例如常常檢查有沒有新信...etc)
  3. 用戶離線時,queue 必須能自動從記憶體中釋放
  4. 每個 queue 內的訊息量少
  5. client 通常是離線的,非即時

第三點要特別說明一下:如果沒在用的 queue 不從系統中釋放,那麼每個註冊過的用戶,都需要配置資源給 queue,這樣既浪費又撐不住,良好的設計應該只會配置記憶體給最近有使用的 queue。

任務導向 Queue 的特徵

  1. Queue 的數量跟任務數差距不大
  2. 讀取和寫入數通常 一比一
  3. Queue 本身持續保持在記憶體中 (queue 內的訊息則不一定)
  4. 每個 queue 內的訊息量大
  5. Client 與 server 間通常保持持續連線,即時

舉例來說:你的系統要接金流系統,每一次用戶下單都會送給銀行。為了避免漏單,或是調節流量,你會設計一個專門的 queue 來處理所有同類型的交易。像這個例子裡 queue 的總數就不大,例如服務提供三種付費方式,接十家銀行,那麼相乘後 queue 也才 30 個。

用戶導向 Queue 的解決方案

上面提到的兩種 queue 的應用,它們都是 durable queue,但最大的差別在用戶導向是非即時的,而且 queue 數量多,而任務導向的則相反。後者可以用類似 RabbitMQ 方案來解決,但前者呢?我找了很久都沒找到有下列特徵的:

  1. HA - High Availabilty (任何節點掛了,還能讀取訊息,且可 寫入 )
  2. Horizontal scale out (幾千萬用戶數的 queue 自然要可以水平延展,分散在不同節點)
  3. 用戶離線時,queue 必須能自動從記憶體中釋放
  4. 發展成熟

結論就是找不到現成的方案,最後只好自己寫。

以 Redis 實作用戶導向 Queue

參考網路上的做法,大多是直接用 Redis。Redis 的 List 或是 Sorted Set 很適合拿來當 queue 用,而且速度超快。但是 Redis 並不滿足我第三點的要求:要能從記憶體中釋放,除非你自己寫個 background job,每天將閒置過久的 queue 寫到 disk,然後真的要用時還要從 disk 讀進 redis,非常費功夫。另外,現階段 Redis 要同時做到 HA 和 scale out 也是很困難的,它直到 3.0 版才開始有 Redis cluster,現在還在 RC 的階段,效果未知,離成熟還有一段時間。

以資料庫實作用戶導向 Queue

用資料庫去實作用戶導向 queue 是可行的,因為大部份的情況下這個 queue 都是非即時的,等用戶自己來撈。單純用資料庫來做的話,也直接滿足第三點,因為要求訊息時都直接去資料庫查詢,所以沒有配置記憶體的問題,只是查詢會慢一點。我想 Postgres / MySQL 之類的傳統 RDBMS 都是可以勝任的,只差在 HA / scale out 也是很難搞定,不像 NoSQL 中的 Cassandra 這麼簡單。

Cassandra Durable Queue Anti-pattern

有關 RDBMS 當 queue 來用我沒有什麼經驗,我們公司目前部署比較大的 DB 只有 Cassandra,所以只討論 Cassandra 的部份。首先,先大概說明一下 Cassandra 為什麼不能做 durable queue:

  • Cassandra 俱備 high availabilty 的特性。HA 的條件是某些節點掛了,整體服務還能照常運作,包含寫入資料
  • 在運行中掛掉的那個節點,不會收到應該寫入的資料,所以它儲存的資料一直都是錯的。直到它回復後,才能從其他的備份節點修補資料。
  • 修補的過程中,如果是新增的資料不見了,只要從備份裡複製一份回來就好了。但如果是被刪除的資料呢?修補時反而會把該被刪的資料救回。
  • 所以 Cassandra 的設計裡,刪除這個動作會存下來,而不是直接殺掉該筆。這個動作叫做墓碑,tombstone。內部實作是新增一筆 DeletedColumn (參考資料)
  • 刪除資料時的範例:
//假設一開始有四筆
[a, b, c, d]

//後來刪除了 a,b,c,可能會有幾種情況:

// (1) a, b, c 被 DeletedColumn 蓋掉 (用 ' 表示)
[a', b', c', d]
 
// (2) 原來的 a,b,c 還存在別的檔案不動,多了三筆 DeletedColumn
[a', b', c'] [a, b, c, d]
  • 不論刪除後的狀態如何,儲存在 disk 的資料都是變多的,即使只剩下 d 一筆有意義
  • 因此如果用 Cassandra 來實作 durable queue,就會像上面的範例,處理更多的訊息後,實際儲存的資料不減反增,裡面一堆墓碑。假設這個 queue 有過 10000 筆訊息,而其中的 9999 筆已經處理掉了,最後 Cassandra 內會存成:
[a1', a2', a3', ... , a9998', a9999', a10000]
  • 當資料變成這樣後,用戶如果要取得那尚未處理的a10000 ,Cassandra 得先掃過那 9999 筆的墓碑,才可以肯定只剩下一筆,並且回傳給 client。這個繁重的 disk IO 就不用說了。
  • 還沒完,由於 Cassandra 要維持資料的一致性,所以它需要比對各個節點的資料是否一致,所以它從 A 節點查詢出 10000 筆,B 節點也查詢出 10000 筆,然後在 heap 裡比對差異。明明只是為了讀那一筆出來,但是 heap 沒多久就爆了,這實在是太慘了。

以上就是 Cassandra 無法處理 durable queue 的根本原因。整個來龍去脈看下來,始作俑者就是墓碑這個設計,但它也是為了達到 high availability、又要維持一定的完整性 (consistency),才會存在的。CAP 理論中的 CA 互斥關係,迫使系統在設計上做了妥協,也失去了一些能力。

摒棄刪除的概念

用戶導向的 queue 沒有現成的解決方案,想要借助 Cassandra 來實作 durable queue 又窒礙難行。那怎麼辦?山不轉路轉,換個設計看看能不能避開墓碑囉。

那麼就讓 queue 內的訊息一直累積吧,不刪除資料自然就不會有墓碑。我們可以設計一個指標性質的新 table,它用來存放最後 讀取點。讀取點存的是最後一次讀取的訊息 ID (一般是用 Type 1 UUID,按時間順序產生的一種 UUID,精準度到 microsecond) 。

讀取點的使用流程大概是這樣:client 每次要求 queue 內的訊息,都從最後讀取點開始讀出,處理完訊息後,再將讀出訊息中最後一筆當作新的讀取點寫回。

//每次讀三個訊息的話,讀到最後會有四次的讀取點 1,2,3,4
[a, b, c, d, e, f, g, h, i, j, k] //所有訊息
       ^        ^        ^     ^
       1        2        3     4  //讀取點

當然這不是什麼新設計,很多系統的 queue 都是這樣實作的。只是這個作法有違常理:Queue 中處理過的訊息你會想殺掉,因為何必讓用不到的資料繼續佔空間佔資源呢?現在這個額外讀取點的做法,是用空間換取效能,這是為了避開墓碑而做的妥協。

改成這個作法後:

  • 不會踩到 anti pattern 了,可以維持一定的效率
  • 每次讀 queue 裡的訊息,首先要查詢讀取點 table,然後才能查詢訊息 table,反而要兩個 round trip 了,比較慢
  • 訊息會一直累積,佔資源

Table rotation

為了解決無用訊息一直累積的問題,可以將訊息依日期間隔隔開儲存,然後一段時間一口氣刪除已經過期的資料,例如每個月都建一個新 table

CREATE TABLE MyQueue_2014_12 (...)
CREATE TABLE MyQueue_2015_01 (...)
CREATE TABLE MyQueue_2015_02 (...)
...

每當新增訊息到 queue 時,就選擇當月的 table 寫入 。如果 queue 裡的訊息最多只替用戶保持一個月,那麼到了,譬如2015 年 2 月時,就可以將 2014 年 12 月的 table MyQueue_2014_12 直接 drop 掉。因為是整個 drop,沒有墓碑的問題。

Bucket rotation with TTL

除了開新的 table 外,也可以在主鍵這個層級做區隔 -- 透過設立一個新的 column 做為時間的區段 (bucket) 。這個作法比較進階了,可能需要實際用過 Cassandra 才看得懂,不過我盡可能解釋。我們可以設計 queue 的 table 如下:

CREATE TABLE MyQueue (
  user_id TEXT,
  bucket TEXT,   -- '2014_12', '2015_01'... etc
  message_id TIMEUUID,
  data TEXT,
  PRIMARY KEY ((user_id, bucket), message_id)
)

上面範例的主鍵由 user_id (用戶)、bucket (時間的區段)、及訊息的序號 (time based UUID) 三個組成。

本文討論的主題是用戶導向的 queue,所以主鍵會包含用戶的 ID,每個用戶各自存自己的訊息。主鍵中的第二個則是訊息的建立時間所在的區段,如果按月隔開的話,它的值就是 2014_12, 2015_01 ,依此類推,每個月變一次。這兩個欄位設定為 Cassandra 的 partition key,也就是說,同個用戶,而且同個月份的訊息,才會存到同個 Cassandra 的節點上,差任何一個鍵,都有可能放在不同節點。這樣的分配能避免有一台節點資料過度集中,產生 hot spot。

最後一個主鍵是訊息 ID,它的型別是 Type-1 UUID,message_id 這裡設為 cluster key,所以 Cassandra 在儲存時會按照這時間順序存在 disk。而 queue 在讀取時通常也是按時序讀取,當 Cassandra 內部進行查詢時,disk seek 只需一次就可以找到位置,並且用 disk 中最快的循序讀取一次讀完要求的訊息數,非常有效率。

//範例資料:
use_id  |  bucket   | message_id 
----------------------------------------------------------------
'userA'   '2014-12'   71493f30-99ba-11e4-a97b-1387d4d6544b
'userA'   '2014-12'   74ee0440-99ba-11e4-a97b-1387d4d6544b
'userA'   '2015-01'   8d059e80-99ba-11e4-a97b-1387d4d6544b
'userB'   '2014-12'   8eb4ae10-99ba-11e4-a97b-1387d4d6544b

// 查詢的範例,WHERE 要指定當月或上個月的 bucket
SELECT * 
  FROM MyQueue
 WHERE user_id = 'userA'
   AND bucket = '2014-12'
 LIMIT 10  

上面是大概資料的樣子。像這樣的結構就沒辦法像前面 table rotation 的範例那樣,可以一口氣殺掉整個月的過期資料。因為 Cassandra 在刪除時不能只指定部份主鍵,像下面這樣是不行的:

// 不合法的 CQL
DELETE FROM MyQueue WHERE bucket = '2014-12'

不過 Cassandra 有提供 TTL (time to live) 的功能,就是當時間過期了,該筆資料會自動刪除:

// 5356800 = 2 * 31 * 86400秒 = 二個月
INSERT 
  INTO MyQueue 
       (user_id, bucket, message_id, data)    
VALUES ('userA', '2014_12', now(), 'd1..')   
 USING TTL 5356800;

USING TTL 就是指定該筆資料何時會被刪掉,設定 TTL 後,過期的訊息就會自動被刪除,不用花額外的功夫去清理。當然這個作法就會產生墓碑了。不過,由於我們每次查詢時都會指定時間區段 (bucket),只要能夠避開那個會有墓碑的區段就沒事了。

在這個範例裏,queue 裡的訊息只保留一個月,所以查詢訊息時,我們只會查詢本月和上個月的 bucket 而已。前兩個月的 bucket 再也不會碰到。因此即使刪除它而產生了許多墓碑也是無礙。注意這裡的 TTL 要設兩個月 (需求是保留一個月,但 TTL 要設兩倍的區段)

利用 bucket 隔開來避免墓碑的問題是算是進階的 hack,要運用這個技巧你必須非常了解 partition key, cluster key, tombstone 完整的運作原理。本文只做了簡單的說明,它其實有很多眉角的。

Rotation with foward insert

我們靠著按時間 rotation 的作法,巧妙避開墓碑問題,又可清除不要的資料。但是在取得 queue 裡最新訊息的過程中,反而會產生多餘的 round trip,例如遇到跨月的時候:

  • Client 要求 queue 內的訊息,最多 10 筆
  • Server 查詢到最後讀取點是上個月
  • 利用讀取點查詢上個月 bucket 的訊息
  • 上個月 bucket 的訊息回傳 6 筆
  • 因為不足 10 筆,再查詢本月 bucket 的訊息 ,結果取得 2 筆
  • 最後回傳 8 筆

一共查詢了三次才得到全部結果,如果讀取點的 table 也是設計成只新增,不修改,那它也要做 rotation。最差的案例就要查詢四次了。

這裡提供一個小技巧,可以減少 round trip 的次數。就是當你在新增資料時,一次新增兩筆:

// 新增到本月 '2014_12',TTL 是一個月長
INSERT 
  INTO MyQueue 
       (user_id, bucket, message_id, data)    
VALUES ('userA', '2014_12', now(), 'd1..')   
 USING TTL 2678400; -- 一個月

// 同一筆資料也新增到下個月 '2015_01',TTL 是二個月長
INSERT 
  INTO MyQueue 
       (user_id, bucket, message_id, data)    
VALUES ('userA', '2015_01', now(), 'd1..')   
 USING TTL 5356800; -- 二個月

同一筆資料,各新增一筆到本月和下個月的 bucket。這樣一來,每次 client 要求時只要查詢當月的 bucket 即可,因為當月的 bucket 裡已經包含上個月的資料了。注意上例中兩個新增的 TTL 時間長度不一樣。

這個技巧是善用 Cassandra 寫入比讀取還快的特性 (差 10~100倍) -- 反正寫入超快,不如一次寫兩筆來減少讀取時的 round trip 數。這個方法的缺點會佔用兩倍的儲存空間,算是用空間換取時間。如果訊息 table 很大的話,那就不適合套這個技巧。

讀取點在分散式下的問題

在分散式的架構下,尤其是 Cassandra,允許不同的節點同時做寫入,這會造成新增資料不會按時序排列。舉例來說,client 去某個 Cassandra 節點要了 queue 中訊息,例如 [a, b, d],將它們處理完後,正打算把 d 當做新的讀取點寫回時,在別個節點上正好新增了一筆 c,而它比 d 還要舊,排在它前面。結果下次讀取時從 d 開始算起,c 這筆就永遠漏掉了。

這個問題的成因是讀取點是以時間為基礎的設計,但是每個 Cassandra 節點的時鐘卻不可能完美的同步。可能的解法有幾種:一是限制同個 queue 都要在同個節點上新增訊息;二是讀取點加上一點時間的緩衝,既每次查詢時改成從最後讀取點前10秒開始,緩衝時間內如果有重覆的訊息則由應用層解決。

在分散式的前提下要保證訊息按順序新增和讀出是一件超困難的事,即使真的實作出來了也是很慢。我們公司目前暫時選了緩衝時間的解法,而且時間拉長到一分鐘。這不是完美解,但實務上要出現一分鐘的時間誤差機率非常低。

總結

呼~ 這篇已經太長了,就此打住吧,原本應該分兩篇討論的。本文一開始討論不同需求,會有兩大類的 queue 的應用。任務導向的 queue 有解,用戶導向的目前我還找不到好的,尤其還要同時滿足 HA, scale out 等嚴苛的要求。後半的文章則討論了一些替代的方案。Cassandra 雖然 scale 能力很好,但是卻因為墓碑的設計無法勝任 queue 的任務。雖然如此,透過變更設計,將訊息按時間分段處理,還是可以克服墓碑的問題。

最後,本文選擇了 不刪除 來突破困境。而這,又是 Immutability 的另一場勝利,特別是在分散式的環境下。


回響

可以用 Tag <I>、<B>,程式碼請用 <PRE>