watermint.org - Takayuki Okazaki's note

Go: 複数のReader/Writerに対する帯域幅制御

ネットワークやディスクへの読み書き処理の際、帯域幅制御をしたい場合があります。低優先度の処理などによって主となるビジネスロジックが阻害されないよう制御するといった目的や、一つのサーバ資源で複数のサービスを提供するときに一つのサービスが資源を消費しすぎないようにしたいといった目的があります。

Goでこのような処理を書きたいとき、ざっと調べてみたところgo-flowrateという実装が見つかりました。

func main() {
    f, _ := os.Open("data.dat")

    // 100バイト/秒の帯域幅制御付きのラッパーをつくる
    f2 := flowrate.NewReader(f, 100)

    // 実際の処理
    // ...
}

使い方もシンプルできっちり制御できるのでこれでいいのですが、制御できるのが一つのio.Readerまたはio.Writerのみに対してのみ可能で、複数のio.Readerio.Writerに対しては制御することができません。

並列処理が必要なプログラムを書いているとちょっとこれではそのまま使えそうにありません。もう少しほかの選択肢を探してもよいのですが、ちょうどいいGoのプログラミング課題ということで新しく実装してみることにしました。

出来上がり

まずはでき上がったライブラリを紹介します。bwlimitとしてGithub上に公開してあります。利用例は次のようなイメージです。

func main() {
    // 100バイト/秒に制限
    bwlimit := NewBwlimit(100, false)

    // 複数のReader
    f1, _ := os.Open("data1.dat")
    f2, _ := os.Open("data2.dat")

    // ラッパーを作成
    fr1 := bwlimit.Reader(f1)
    fr2 := bwlimit.Reader(f2)

    // 実際の処理
    // ...

    // すべてのReaderがClose()されるかEOFになるまで待機
    bwlimit.Wait()
}

まずは帯域幅制御を制御するオブジェクト(上図ではbwlimit)を作り、さらにそれぞれラッパーを作成します。NewBwlimitの第二引数は帯域幅制限しているときRead/Write処理を待たせるかどうかです。

あとはラッパーを普通のio.Readerとして扱うだけです。並列処理をしていて、すべてのReaderまたはWriterについて処理が完了したかどうか知りたくなったので待機するWait()もつくりました。

仕組み

キューを作る方法などいくつか実装方法を検討しましたが最終的にはかなりシンプルな構造になりました。トヨタ生産方式の本を思い出してヒントを得ました。

タクトタイムという一定時間のリズムを刻んで、各拍子のタイミングで利用可能な帯域を現在有効なReader/Writerで分割して利用するアイデアです。

たとえばタクトタイムを1秒間に10回に設定したとします。帯域幅設定を1000バイト/秒にした場合、タクトタイム1回あたりに利用可能な帯域幅は100バイトです。有効なReaderが2つであればこの100バイトをわけあって、50バイトずつの読み取りを許可する枠を設定します。実際に、Reader側が読み取るかどうかはわかりませんが、読み取り可能な上限を制御することでこのような流量制御を行っています。

流量制御と障害防止

あるタクトタイムのときに、読み取りが実際に発生せず50バイト分の枠がまるまるある状態で、次のタクトタイムになったとき読み取り可能枠を50 + 50バイト分にはしないようにしました。流量はこれによってすこし減ってしまいますが、意図しない障害を防ぐためです。

読み取り可能枠が溜まりにたまって一気に転送が行われてしまうと、他のサービスへ影響がでることはもちろん、場合によってはルーターなどのバッファサイズを超えるなどして障害が発生することも考えられます。

利用例

また別の記事として各予定ですが、Dropboxへのファイルアップローダーをいま作っています。このファイルアップローダーで並列してファイルのアップロードをする際に、帯域幅制御をつけたかったのがこのライブラリ作成のモチベーションです。