Java

couchbase clientを作るときに注意した点

投稿日:

couchbase clientを作るときに注意した点

 

2つ以上のClusterに接続する場合は共通のCouchbaseEnvironmentを使うべき

https://developer.couchbase.com/documentation/server/current/sdk/java/managing-connections.html

そうすることで、thread poolとかが再利用される そうしないと、それぞれのクラスターでthread poolを作り、それらがごちゃまぜに なってしまう可能性があるのではないか、と思います。(そのへんの管理ができないから、ちゃんと制約を守って使ってね) と解釈しました

 

タイムアウト、冗長構成、パフォーマンス、初期化の設定はDefaultCouchbaseEnvironmentで

各種、タイムアウト、冗長構成、パフォーマンス、初期化などの設定は
これで

https://developer.couchbase.com/documentation/server/current/sdk/java/client-settings.html

もし二つ以上bucketをopenするときに注意すること

 

クラスターとバケットは共有すること!
SDKはthread safeだからさらにsynchronizedする必要なし!
クラスターが違えば、envは共通にしろ!

  • Always create only one instance of a CouchbaseCluster and share it across threads (same with buckets).
  • The SDK is thread-safe, so no additional synchronization is needed when interacting with the SDK.
  • If different clusters need to be accessed, reuse the CouchbaseEnvironment (See Scalability and Concurrency).

 

thread-safe(スレッドセーフ)って?

マルチスレッドプログラミングにおける概念である。あるコードがスレッドセーフであるという場合、そのコードを複数のスレッドが同時並行的に実行しても問題が発生しないことを意味する
https://ja.wikipedia.org/wiki/%E3%82%B9%E3%83%AC%E3%83%83%E3%83%89%E3%82%BB%E3%83%BC%E3%83%95

スレッド並列に動かしても、問題ないオブジェクトの作りしてるよ、ってことだよね。

じゃあ、スレッドセーフかどうかの判断基準
この基準は逆に作るときにも基準になる。

広域変数やヒープにアクセスしているかどうか
グローバルな制限のあるリソース(ファイル、プロセスなど)を確保・解放しているかどうか
参照やポインタによる間接アクセスをしているかどうか
明確な副作用があるかどうか(例えば、C言語で volatile 変数にアクセスするなど

ローカルで完結した作りか?リソース

自分でConnection Poolを作る必要があるのかどうか?

https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/thread-safety.html
を読むと、
"you will need to implement connection pools and other language-specific thread-safety measures."
とありましたので、
その必要性があるのかを調べました。

この記事を読むと結論としては、シングルトンで共通のクラスターを使っている限り、
自分でプールを実装する必要はない
ということです
https://forums.couchbase.com/t/connection-pooling-in-couchbase-sdk-spymemcached/205/6

https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/thread-safety.html
ドキュメントはバージョン3で
すでにドキュメントのバージョン4では上のことは
言及されていません
バージョン3での検索結果ではドキュメントがヒットしますが

バージョン4での検索結果だと、見つかりませんね

ということで自信を持って簡単に実装を済ませたいと思います。

 

バケットハンドラーはthread-safeなのか?

一点気になるこの投稿
バケットハンドラーはthread-safeなのか?

https://forums.couchbase.com/t/does-the-bucket-handler-is-thread-safe/4179/3

という投稿があって、couchbase SDKの開発者が、
スレッドセーフだけど順序は保証されないからね、
と書いていて、get setの順序は保証されないと、書いていますね。
投稿者が入っているのは、get,getでバグっている
と言っていますね。

そもそもBucket handlerとは、独自のhandlerのことを言っているのか、それは不明瞭
つまり、get, getでバグることはない。thread-safeだし、
set,getで順番が意図しないのは当然ある、それはosでも管理しないし、
ということで、
私が開発を進めるにあたり、この投稿は不明瞭で、無視できるものとします。

スレッドセーフか検証してみた

結果、スレッドセーフだったよ

冒頭でも書いたが、オフィシャルドキュメントで言われてるよう
thread-safeでした。しかし、一応、シングルトンで扱うなどしておいたほうが、複数バケット、とかでテストしてないので
避けなトラブルを避けるために、従っておいたほうがいいでしょう。
もし、thread-safeじゃなかったら、オブジェクトを毎回、作り直して操作しようとも思ったが、
20倍以上、パフォーマンスが落ちるのでかなりの負荷でやめるべきと判断した

[scala]

/*
Finding with this Test
- Thread is safe and properly readable.
- 100 times get elapsed time was 119msec
- 100 times get elapsed time was 2558msec (with bucket instanciated each time)
*/
class CouchbaseThreadSafeTest extends FlatSpec with Matchers {
ignore should "read properly and consistently, means the thread and response should match" in {

val cluster = CouchbaseCluster.create("127.0.0.1")
val bucket = cluster.openBucket("default")

class GetThread(threadId: String, bucket: Bucket) extends Thread {
override def run() = {
val doc = bucket.get(threadId, classOf[StringDocument])
println(s"threadId: ${threadId}, get: ${doc.id}, content: ${doc.content}")
assertResult(threadId)(doc.id)
}
}

class PutThread(threadId: String, bucket: Bucket) extends Thread {
override def run() = {
val doc = StringDocument.create(threadId, threadId)
bucket.upsert[StringDocument](doc)
println(s"threadId: ${threadId}, put: ${threadId}")
}
}

println("waiting...")
val loopN = 100
Thread.sleep(2000)
println("start")
1 to loopN foreach { n =>
val t = new PutThread(s"key-${n}", bucket)
t.run()
}
println("waiting...")
Thread.sleep(2000)

val start = System.currentTimeMillis
1 to loopN foreach { n =>
val t = new GetThread(s"key-${n}", bucket)
t.run()
}
println(s"${loopN} times get elapsed time was " + (System.currentTimeMillis - start) + "msec")

println("end")
bucket.close()
cluster.disconnect()
}
}

[/scala]

 

-Java

Copyright© CTOを目指す日記 , 2025 All Rights Reserved.