ストリームデコレータリファレンス
cartesianProduct
cartesianProduct関数は、多値フィールド(つまり、配列)を持つ単一のタプルを、配列内の値ごとに複数のタプルに変換します。つまり、fieldA の N 個の値の配列を含む単一のタプルが与えられると、cartesianProduct関数は N 個のタプルを出力します。各タプルには、元のタプルの配列からの1つの値が含まれます。本質的に、さらなる処理のために配列をフラット化できます。
たとえば、cartesianProductを使用すると、このタプルを
{
"fieldA": "foo",
"fieldB": ["bar","baz","bat"]
}
次の 3 つのタプルに変換できます
{
"fieldA": "foo",
"fieldB": "bar"
}
{
"fieldA": "foo",
"fieldB": "baz"
}
{
"fieldA": "foo",
"fieldB": "bat"
}
cartesianProduct のパラメータ
-
incoming stream:(必須)単一の入力ストリーム。 -
fieldName または evaluator:(必須)値をフラット化するフィールドの名前、または結果がフラット化される評価器。 -
productSort='fieldName ASC|DESC':(オプション)新しく生成されたタプルのソート順。
cartesianProduct の構文
cartesianProduct(
<stream>,
<fieldName | evaluator> [as newFieldName],
productSort='fieldName ASC|DESC'
)
cartesianProduct の例
次の例は、このソースタプルのさまざまな出力を示しています
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
}
単一フィールド、ソートなし
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
単一評価器、ソートなし
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
単一フィールド、値でソート
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
単一評価器、評価器の値でソート
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
productSort="newFieldE desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
名前変更された単一フィールド、値でソート
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB as newFieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB2",
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB1",
}
複数フィールド、ソートなし
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
複数フィールド、単一フィールドでソート
cartesianProduct(
search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc"
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
複数フィールド、複数フィールドでソート
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc, fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
フィールドと評価器、ソートなし
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
fieldB
)
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 14
}
上記の例でわかるように、cartesianProduct関数は、複数のフィールドや評価器にわたってタプルをフラット化することをサポートしています。
classify
classify関数は、ロジスティック回帰テキスト分類モデルを使用してタプルを分類します。これは、train関数を使用してトレーニングされたモデルで動作するように特別に設計されました。classify関数は、model関数を使用して格納されたモデルを取得し、そのモデルを使用してタプルのストリームをスコアリングします。分類器によって読み取られるタプルには、分類に使用できるテキストフィールドが含まれている必要があります。classify 関数は、Lucene アナライザーを使用してテキストから特徴を抽出するため、モデルを適用できます。デフォルトでは、classify関数は、タプルのテキストフィールドの名前を使用してアナライザーを探します。ワーカーノードの Solr スキーマにこのフィールドが含まれていない場合は、analyzerFieldパラメータを指定して別のフィールドでアナライザーを検索できます。
分類される各タプルには、2 つのスコアが割り当てられます
-
probability_d*:タプルがクラスに属する確率を表す 0 から 1 までの浮動小数点数。これは分類のユースケースで役立ちます。
-
score_d*:0 から 1 の間に圧縮されていないドキュメントのスコア。スコアは正または負になります。スコアが高いほど、ドキュメントがクラスに適合します。この圧縮されていないスコアは、クエリの再ランキングや推奨のユースケースで役立ちます。このスコアは、複数の上位ドキュメントの probability_d スコアが 1 の場合、ドキュメント間で意味のあるランキングを提供しない場合に特に役立ちます。
classify のパラメータ
-
model expression:(必須)格納されたロジスティック回帰モデルを取得します。 -
field:(必須)分類器を適用するタプルのフィールド。デフォルトでは、スキーマ内のこのフィールドのアナライザーが特徴を抽出するために使用されます。 -
analyzerField:(オプション)スキーマ内でアナライザーを検索するための別のフィールドを指定します。
classify の構文
classify(model(modelCollection,
id="model1",
cacheMillis=5000),
search(contentCollection,
q="id:(a b c)",
qt="/export",
fl="text_t, id",
sort="id asc"),
field="text_t")
上記の例では、classify expression は api 関数を使ってモデルを取得しています。次に、search 関数によって返されたタプルを分類しています。text_t フィールドはテキスト分類に使用され、Solr スキーマの text_t フィールドのアナライザーはテキストを分析して特徴量を抽出するために使用されます。
commit
commit 関数は単一のストリーム (A) をラップし、コレクションとバッチサイズが与えられると、バッチサイズが満たされたとき、またはストリームの終わりに達したときに、コレクションにコミットメッセージを送信します。コミットストリームは、更新ストリームと最も頻繁に使用されるため、コミットは更新ストリームから来る可能性のあるサマリータプルを考慮に入れます。コミットストリームに入るすべてのタプルは、コミットストリームから返されます。タプルが削除されたり、追加されたりすることはありません。
commit パラメータ
-
collection: コミットメッセージを送信するコレクション (必須) -
batchSize: コミットバッチサイズ。バッチサイズに達するとコミットメッセージを送信します。提供されていない場合 (または値 0 で提供された場合)、コミットは着信ストリームの最後にのみ送信されます。 -
waitFlush: コミットハンドラーに直接渡される値 (true/false、デフォルト: false) -
waitSearcher: コミットハンドラーに直接渡される値 (true/false、デフォルト: false) -
softCommit: コミットハンドラーに直接渡される値 (true/false、デフォルト: false) -
StreamExpression for StreamA(必須)
complement
complement 関数は2つのストリーム (A と B) をラップし、B に存在しない A からのタプルを発行します。タプルは、ストリーム A に現れる順序で発行されます。両方のストリームは、等価性を判断するために使用されるフィールド (on パラメータを使用) でソートする必要があります。
complement パラメータ
-
StreamExpression for StreamA -
StreamExpression for StreamB -
on: A と B の間のタプルの等価性をチェックするために使用されるフィールド。on="fieldName"、on="fieldNameInLeft=fieldNameInRight"、またはon="fieldName, otherFieldName=rightOtherFieldName"の形式にすることができます。
complement 構文
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
daemon
daemon 関数は別の関数をラップし、内部スレッドを使用して一定間隔で実行します。daemon 関数は、継続的なプッシュストリーミングとプルストリーミングの両方を提供するために使用できます。
継続的なプッシュストリーミング
継続的なプッシュストリーミングでは、daemon 関数は別の関数をラップし、実行のために /stream ハンドラーに送信されます。/stream ハンドラーは daemon 関数を認識し、それをメモリに常駐させて、内部関数を一定間隔で実行できるようにします。
タプルのプッシュを容易にするために、daemon 関数はタプルをどこかにプッシュする別のストリームデコレータをラップする必要があります。この例の1つは、ストリームをラップし、インデックス作成のために別の SolrCloud コレクションにタプルを送信する update 関数です。
daemon 構文
daemon(id="uniqueId",
runInterval="1000",
terminate="true",
update(destinationCollection,
batchSize=100,
topic(checkpointCollection,
topicCollection,
q="topic query",
fl="id, title, abstract, text",
id="topicId",
initialCheckpoint=0)
)
)
上記のサンプルコードは、topic 関数をラップしている update 関数をラップしている daemon 関数を示しています。この式が /stream ハンドラーに送信されると、/stream ハンドラーは daemon 関数を認識し、それをメモリに保持して一定間隔で実行します。この特定の例では、daemon 関数は update 関数を毎秒実行します。update 関数は、topic 関数をラップしており、topic 関数クエリに一致するタプルをバッチでストリーミングします。トピックへの後続の呼び出しごとに、トピックの次のタプルのバッチが返されます。update 関数は、トピックに一致するすべてのタプルを別のコレクションに送信してインデックスを作成します。terminate パラメータは、topic 関数がタプルの送信を停止したときにデーモンを終了するように指示します。
これにより、特定のクエリに一致するドキュメントが別のコレクションにプッシュされるという効果が得られます。カスタムプッシュ関数をプラグインして、ドキュメントを Solr から Kafka やメールシステムなどの他のシステムにプッシュできます。
プッシュストリーミングは、バックグラウンドで集計が一定間隔でロールアップされ、他の Solr コレクションにプッシュされる継続的なバックグラウンド集計シナリオにも使用できます。もう1つのユースケースは、継続的なバックグラウンド機械学習モデルの最適化です。最適化されたモデルは、クエリに統合できる別の Solr コレクションにプッシュされます。
/stream ハンドラーは、デーモン関数を一覧表示および制御するための小さなコマンドセットをサポートしています
https://:8983/solr/collection/stream?action=list
このコマンドは、特定のノードで実行されている現在のデーモンと、その現在の状態の一覧を提供します。
https://:8983/solr/collection/stream?action=stop&id=daemonId
このコマンドは、特定のデーモン関数を停止しますが、メモリに常駐させたままにします。
https://:8983/solr/collection/stream?action=start&id=daemonId
このコマンドは、停止した特定のデーモン関数を開始します。
https://:8983/solr/collection/stream?action=kill&id=daemonId
このコマンドは、特定のデーモン関数を停止し、メモリから削除します。
継続的なプルストリーミング
DaemonStream java クラス (SolrJ ライブラリの一部) を Java アプリケーションに埋め込んで、継続的なプルストリーミングを提供することもできます。サンプルコード
StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello"); // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents
TopicStream topicStream = new TopicStream(zkHost, // Host address for the ZooKeeper service housing the collections
"checkpoints", // The collection to store the topic checkpoints
"topicData", // The collection to query for the topic records
"topicId", // The id of the topic
-1, // checkpoint every X tuples, if set -1 it will checkpoint after each run.
topicQueryParams); // The query parameters for the TopicStream
DaemonStream daemonStream = new DaemonStream(topicStream, // The underlying stream to run.
"daemonId", // The id of the daemon
1000, // The interval at which to run the internal stream
500); // The internal queue size for the daemon stream. Tuples will be placed in the queue
// as they are read by the internal thread.
// Calling read() on the daemon stream reads records from the internal queue.
daemonStream.setStreamContext(context);
daemonStream.open();
//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
// The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
//Do something with the tuples
}
// Shutdown the DaemonStream.
daemonStream.shutdown();
//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.
while(true) {
Tuple tuple = daemonStream.read();
if(tuple.EOF) {
break;
} else {
//Do something with the tuples.
}
}
//Finally close the stream
daemonStream.close();
delete
delete 関数は他の関数をラップし、見つかった id および _version_ 値を使用して、タプルをID で削除コマンドとして SolrCloud コレクションに送信します。
これは、以下で説明する update() 関数に似ています。
delete パラメータ
-
destinationCollection: (必須) タプルが削除されるコレクション。 -
batchSize: (オプション、デフォルトは250) 削除バッチサイズ。 -
pruneVersionField: (オプション、デフォルトはfalse) タプルから_version_値を削除するかどうか -
StreamExpression: (必須)
delete 構文
delete(collection1,
batchSize=500,
search(collection1,
q=old_data:true,
qt="/export",
fl="id",
sort="a_f asc, a_i asc"))
上記の例では、collection1 に対して search 関数によって返されたタプルを消費し、検出された各ドキュメントの id 値を同じ collection1 に対する削除リクエストに変換します。
|
同時更新を無視して、一致するすべてのドキュメントを削除したいユーザーは、 同時更新を予測し、失敗した削除を「スキップ」したいユーザーは、 |
eval
eval 関数を使用すると、新しいストリーミング式をオンザフライで生成して評価するユースケースに対応できます。eval 関数はストリーミング式をラップし、基礎となるストリームから単一のタプルを読み取ります。次に、eval 関数はタプルの expr_s フィールドから文字列ストリーミング式を取得します。次に、eval 関数は文字列ストリーミング式をコンパイルし、タプルを発行します。
executor
executor 関数はストリーミング式を含むストリームソースをラップし、式を並行して実行します。executor 関数は、各タプルの expr_s フィールドで式を探します。executor 関数には、同じワーカーノード上で並行して式をコンパイルおよび実行するタスクを実行する内部スレッドプールがあります。この関数は、クラスタ全体で式の並列実行を提供するために、parallel 関数でラップすることにより、ワーカーノード間で並列化することもできます。
executor 関数は、実行する式の出力に対して特定のことは行いません。したがって、実行される式には、タプルを宛先にプッシュするロジックが含まれている必要があります。update 関数を実行される式に含めて、タプルを保存のために SolrCloud コレクションに送信できます。
このモデルにより、ジョブの進行中にアクセスできる SolrCloud コレクションに出力が保存されるジョブの非同期実行が可能になります。
executor パラメータ
-
threads: (オプション) 式を実行するためのエグゼキュータースレッドプールのスレッド数。 -
StreamExpression: (必須) 実行するストリーミング式を含むストリームソース。
fetch
fetch 関数はストリームを反復処理し、追加のフィールドをフェッチしてタプルに追加します。fetch 関数は、Solr への呼び出し回数を制限するためにバッチでフェッチします。fetch 関数からストリーミングされたタプルには、元のフィールドとフェッチされた追加のフィールドが含まれます。fetch 関数は1対1のフェッチをサポートしています。ストリームソースに重複キーが含まれている多対1のフェッチも機能しますが、この関数では現在、多対多のフェッチはサポートされていません。
having
having 式はストリームをラップし、各タプルにブール演算を適用します。ブール演算がtrueを返すタプルのみを発行します。
having パラメータ
-
StreamExpression: (必須) having 関数のストリームソース。 -
booleanEvaluator: (必須) 次のブール演算がサポートされています:eq(等しい)、gt(より大きい)、lt(より小さい)、gteq(以上)、lteq(以下)、and、or、eor(排他的 OR)、およびnot。ブール評価器は、他の評価器とネストして、複雑なブールロジックを形成できます。
比較評価器は、特定のフィールドの値と、文字列、数値、またはブール値などの値を比較します。例:eq(field1, 10) は、field1 が10と等しい場合に true を返します。
leftOuterJoin
leftOuterJoin 関数は、LeftストリームとRightストリームの2つのストリームをラップし、Leftからのタプルを出力します。on で定義されたように等しいRightのタプルがある場合、そのタプルの値は出力されるタプルに含まれます。Leftタプルが出力されるために、Rightに等しいタプルが必ずしも存在する必要はありません。これは、1対1、1対多、多対1、および多対多の左外部結合シナリオをサポートします。タプルは、Leftストリームに表示される順序で出力されます。両方のストリームは、等価性を判断するために使用されるフィールドでソートされている必要があります(on パラメータを使用)。両方のタプルに同じ名前のフィールドが含まれている場合、Rightストリームの値が出力されるタプルで使用されます。
出力されるタプルに含めるフィールド値を指定するために、入力ストリームを select 関数でラップできます。
leftOuterJoin パラメータ
-
StreamLeftのStreamExpression -
StreamRightのStreamExpression -
on: LeftとRight間のタプルの等価性を確認するために使用されるフィールド。on="fieldName"、on="fieldNameInLeft=fieldNameInRight"、またはon="fieldName, otherFieldName=rightOtherFieldName"の形式にすることができます。
leftOuterJoin 構文
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
hashJoin
hashJoin 関数は、LeftとRightの2つのストリームをラップし、Rightに存在するLeftのすべてのタプルに対して、両方のタプルのフィールドを含むタプルを出力します。これは、1対1、1対多、多対1、および多対多の内部結合シナリオをサポートします。タプルは、Leftストリームに表示される順序で出力されます。ストリームの順序は重要ではありません。両方のタプルに同じ名前のフィールドが含まれている場合、Rightストリームの値が出力されるタプルで使用されます。
出力されるタプルに含めるフィールド値を指定するために、入力ストリームを select 関数でラップできます。
LeftとRightのタプルを同じ順序に並べることができない場合、hashJoin関数を使用できます。タプルが順不同であるため、このストリームは、open操作中にRightストリームからすべての値を読み取り、すべてのタプルをメモリに格納することによって機能します。この結果、メモリ使用量はRightストリームのサイズに等しくなります。
hashJoin パラメータ
-
StreamLeftのStreamExpression -
hashed=StreamRightのStreamExpression -
on: LeftとRight間のタプルの等価性を確認するために使用されるフィールド。on="fieldName"、on="fieldNameInLeft=fieldNameInRight"、またはon="fieldName, otherFieldName=rightOtherFieldName"の形式にすることができます。
hashJoin 構文
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
hashJoin(
search(people, q="*:*", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
innerJoin
LeftストリームとRightストリームの2つのストリームをラップします。Rightに存在するLeftのすべてのタプルに対して、両方のタプルのフィールドを含むタプルが出力されます。これは、1対1、1対多、多対1、および多対多の内部結合シナリオをサポートします。タプルは、Leftストリームに表示される順序で出力されます。両方のストリームは、等価性を判断するために使用されるフィールド(「on」パラメータ)でソートされている必要があります。両方のタプルに同じ名前のフィールドが含まれている場合、Rightストリームの値が出力されるタプルで使用されます。出力されるタプルに含めるフィールド値を指定するために、入力ストリームを select(…) 式でラップできます。
innerJoin パラメータ
-
StreamLeftのStreamExpression -
StreamRightのStreamExpression -
on: LeftとRight間のタプルの等価性を確認するために使用されるフィールド。on="fieldName"、on="fieldNameInLeft=fieldNameInRight"、またはon="fieldName, otherFieldName=rightOtherFieldName"の形式にすることができます。
innerJoin 構文
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
intersect
intersect 関数は、AストリームとBストリームの2つのストリームをラップし、Bに存在するAからのタプルを出力します。タプルは、Aストリームに表示される順序で出力されます。両方のストリームは、等価性を判断するために使用されるフィールド(on パラメータ)でソートされている必要があります。Aからのタプルのみが出力されます。
intersect パラメータ
-
StreamExpression for StreamA -
StreamExpression for StreamB -
on: A と B の間のタプルの等価性をチェックするために使用されるフィールド。on="fieldName"、on="fieldNameInLeft=fieldNameInRight"、またはon="fieldName, otherFieldName=rightOtherFieldName"の形式にすることができます。
intersect 構文
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
list
list 関数は、N個のストリーム式をラップし、各ストリームを順番に開き、反復処理します。これにより、複数のストリーミング式の結果が連結されます。
list 構文
list(tuple(a="hello world"), tuple(a="HELLO WORLD"))
list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
merge
merge 関数は、2つ以上のストリーミング式をマージし、基になるストリームの順序を維持します。順序が維持されるため、基になるストリームのソートは、merge関数に提供されるonパラメータと一致する必要があります。
merge パラメータ
-
StreamExpression A -
StreamExpression B -
オプションのStreamExpression C、D、…。Z -
on: マージを実行するためのソート条件。形式はfieldName orderで、orderはascまたはdescです。複数のフィールドは、形式fieldA order, fieldB orderで指定できます。
merge 構文
# Merging two stream expressions together
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,fieldA,fieldB,fieldC",
sort="fieldA asc, fieldB desc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,fieldA",
sort="fieldA asc"),
search(collection2,
q="id:(10 11 13)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
search(collection3,
q="id:(987)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
on="fieldA asc")
null
null式は、並列関係代数(結合、インターセクション、ロールアップなど)を実行する際のボトルネックを理解するための便利なユーティリティ関数です。null関数は、基になるストリームからすべてのタプルを読み取り、カウントと処理時間を含む単一のタプルを返します。nullストリーム自体が最小限のオーバーヘッドしか追加しないため、Solrの/exportハンドラーのパフォーマンスを分離するために使用できます。/exportハンドラーのパフォーマンスがボトルネックでない場合、ボトルネックはストリームデコレータが実行されているワーカーで発生している可能性が高くなります。
null式は並列関数でラップし、ワーカーノードに送信できます。このシナリオでは、各ワーカーは、ワーカーで処理されたタプルの数と、そのワーカーのタイミング情報を含む1つのタプルを返します。これにより、次のような貴重な情報が得られます。
-
ワーカーを追加するほど、/exportハンドラーのパフォーマンスが向上するかどうか。
-
タプルがワーカー全体に均等に分散されているか、それともハッシュパーティション分割によって、より多くのドキュメントが単一のワーカーに送信されているか。
-
すべてのワーカーが同じ速度でデータを処理しているか、それともワーカーの1つがボトルネックの原因になっているか。
null 構文
parallel(workerCollection,
null(search(collection1, q="*:*", fl="id,a_s,a_i,a_f", sort="a_s desc", qt="/export", partitionKeys="a_s")),
workers="20",
zkHost="localhost:9983",
sort="a_s desc")
上記の式は、null関数をラップする並列関数を示しています。これにより、null関数が20個のワーカーノードで並列に実行されます。各ワーカーは、処理されたタプルの数と、タプルの反復処理にかかった時間を含む単一のタプルを返します。
outerHashJoin
outerHashJoin 関数は、LeftストリームとRightストリームの2つのストリームをラップし、Leftからのタプルを出力します。on パラメータで定義されたように、等しいRightのタプルがある場合、そのタプルの値は出力されるタプルに含まれます。Leftタプルが出力されるために、Rightに等しいタプルが必ずしも存在する必要はありません。これは、1対1、1対多、多対1、および多対多の左外部結合シナリオをサポートします。タプルは、Leftストリームに表示される順序で出力されます。ストリームの順序は重要ではありません。両方のタプルに同じ名前のフィールドが含まれている場合、Rightストリームの値が出力されるタプルで使用されます。
出力されるタプルに含めるフィールド値を指定するために、入力ストリームを select 関数でラップできます。
LeftとRightのタプルを同じ順序に並べることができない場合、outerHashJoinストリームを使用できます。タプルが順不同であるため、このストリームは、open操作中にRightストリームからすべての値を読み取り、すべてのタプルをメモリに格納することによって機能します。この結果、メモリ使用量はRightストリームのサイズに等しくなります。
outerHashJoin パラメータ
-
StreamLeftのStreamExpression -
hashed=StreamRightのStreamExpression -
on: LeftとRight間のタプルの等価性を確認するために使用されるフィールド。on="fieldName"、on="fieldNameInLeft=fieldNameInRight"、またはon="fieldName, otherFieldName=rightOtherFieldName"の形式にすることができます。
outerHashJoin 構文
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
parallel
parallel 関数はストリーミング式をラップし、並列処理するためにN個のワーカーノードに送信します。
parallel 関数では、基になる検索に partitionKeys パラメータを指定する必要があります。partitionKeys パラメータは、検索結果(タプル)をワーカーノード全体にパーティション分割します。partitionKeys と同じ値を持つタプルは、同じワーカーノードにシャッフルされます。
parallel 関数は、ワーカーノードから返されるタプルのソート順序を維持するため、ソート条件は、ワーカーから返されるタプルのソート順序を組み込む必要があります。
たとえば、年、月、日でソートする場合、タプルをワーカーノード全体に分散させるのに十分な異なる年がある限り、年のみをパーティション分割できます。
Solrでは4つ以上のフィールドでソートできますが、速度を考慮して、4つ以下の partitionKeys しか指定できません。また、1つまたは2つのキーでタプルを分散させるのに十分な場合、多くの partitionKeys を指定するのはやりすぎです。
並列ストリームは、基になる検索ストリームがコレクションから多くのタプルを出力する場合に設計されました。検索ストリームがコレクションからデータの小さなサブセットのみを出力する場合、parallel を使用すると遅くなる可能性があります。
|
ワーカーコレクション
ワーカーノードは、データと同じコレクションからのものにすることも、完全に異なるコレクションからのものにすることもでき、 |
parallel パラメータ
-
collection: StreamExpressionを送信するワーカーコレクションの名前。 -
StreamExpression: ワーカーコレクションに送信する式。 -
workers: 式を送信するワーカーコレクション内のワーカーの数。 -
zkHost: (オプション)ワーカーコレクションが存在するZooKeeper接続文字列。Zookeeperの認証情報とACLは、接続しているSolrインスタンスと同じZkHostが使用されている場合にのみ含まれます(chrootは異なる場合があります)。 -
sort: ワーカーノードから返されるタプルを順序付けるためのソート条件。
parallel 構文
parallel(workerCollection,
rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
over="year_i", count(*)),
workers="20",
zkHost="localhost:9983",
sort="year_i desc")
上記の式は、rollup 関数をラップする parallel 関数を示しています。これにより、rollup 関数が20個のワーカーノードで並列に実行されます。
|
ウォームアップ
同じ数のワーカーと
|
plist
plist 関数は、N個のストリーム式をラップし、ストリームを並列に開き、各ストリームを順番に反復処理します。list と plist の違いは、ストリームが並列に開かれることです。facet、stats、significantTerms などの多くのストリームは、開くとSolrに重い操作をプッシュダウンするため、plist関数はこれらの操作を並列に実行することにより、パフォーマンスを大幅に向上させることができます。
plist 構文
plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))
plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
priority
priority 関数は、executor 関数の単純な優先度スケジューラです。executor 関数には、タスクの優先順位付けの概念が直接含まれていません。代わりに、基になるストリームから読み取られた順序でタスクを実行するだけです。priority 関数は、以前に送信された低優先度のタスクよりも前に、より高い優先度のタスクをスケジュールする機能を提供します。
priority 関数は、実行するストリーミング式を含むタプルを出力する2つのtopic 関数をラップします。最初のトピックは、優先度の高いタスクキューと見なされます。
priority関数が呼び出されるたびに、より優先度の高いタスクキューに実行すべきタスクがあるかどうかをチェックします。優先度の高いキューに待機中のタスクがある場合、priority関数は優先度の高いタスクを発行します。実行する優先度の高いタスクがない場合、優先度の低いキューのタスクが発行されます。
priority関数は、呼び出されるたびに、いずれかのキューからタスクのバッチのみを発行します。これにより、優先度の高いキューに実行するタスクがなくなるまで、優先度の低いタスクが実行されないことが保証されます。
priorityの構文
daemon(id="myDaemon",
executor(threads=10,
priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))
上記の例では、daemon関数がexecutorを繰り返し呼び出しています。呼び出されるたびに、executor関数はpriority関数によって発行されたタスクを実行します。priority関数は2つのトピックをラップします。最初のトピックは優先度の高いタスクキュー、2番目のトピックは優先度の低いトピックです。
reduce
reduce関数は、内部ストリームをラップし、共通フィールドによってタプルをグループ化します。
各タプルグループは、プラグ可能なreduce操作によって単一のブロックとして操作されます。Solrで提供されるグループ操作は、分散グループ化機能を実装します。このグループ操作は、カスタムreduce操作を構築する際に参照できるreduce操作の例としても機能します。
|
reduce関数は、基になるストリームのソート順序に依存します。したがって、基になるストリームのソート順序は、グループ化フィールドと一致している必要があります。 |
rollup
rollup関数は別のストリーム関数をラップし、バケットフィールドに対して集計をロールアップします。rollup関数は、一度に1つのグループを集計するために、基になるストリームのソート順序に依存します。したがって、基になるストリームのソート順序は、rollup関数のoverパラメーターのフィールドと一致している必要があります。
rollup関数は、集計を実行するために結果セット全体を処理する必要もあります。基になるストリームがsearch関数の場合、/exportハンドラーを使用して、ソートされた結果セット全体をrollup関数に提供できます。このソートされたアプローチにより、rollup関数は非常にカーディナリティの高いフィールドに対して集計を実行できます。このアプローチの欠点は、集計するためにタプルをソートして、ネットワーク経由でワーカーノードにストリーミングする必要があることです。低から中程度のカーディナリティフィールドに対してより高速な集計を行うには、facet関数を使用できます。
rollupのパラメータ
-
StreamExpression(必須) -
over: (必須)グループ化するフィールドのリスト。 -
metrics: (必須)計算するメトリクスのリスト。現在サポートされているメトリクスは、sum(col)、avg(col)、min(col)、max(col)、count(*)です。
rollupの構文
rollup(
search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
over="a_s",
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
count(*)
)
上記の例は、search関数をラップするrollup関数を示しています。search関数は、結果セット全体をrollupストリームに提供するために/exportハンドラーを使用していることに注意してください。また、search関数のsortパラメーターがrollupのoverパラメーターと一致していることに注意してください。これにより、rollup関数は、a_sフィールドに対して、一度に1つのグループをロールアップできます。
scoreNodes
グラフ探索のセクションを参照してください。
select
select関数はストリーミング式をラップし、入力タプルからのフィールドのサブセットまたは変更されたセットを含むタプルを出力します。出力タプルに含まれるフィールドのリストには、フィールドを効果的に名前変更するためのエイリアスを含めることができます。selectストリームは、操作と評価器の両方をサポートしています。replace、add、ifなど、任意のフィールドで実行する操作と評価器のリストを提供できます。
selectのパラメータ
-
StreamExpression -
fieldName: 出力タプルに含めるフィールドの名前(これらの複数を含めることができます)。例:outputTuple[fieldName] = inputTuple[fieldName]。fieldNameはワイルドカードパターンにすることができます。たとえば、a_*はa_で始まるすべてのフィールドを選択します。 -
fieldName as aliasFieldName: 出力タプルに含めるエイリアス付きフィールド名(これらの複数を含めることができます)。例:outputTuple[aliasFieldName] = incomingTuple[fieldName] -
replace(fieldName, value, withValue=replacementValue):incomingTuple[fieldName] == valueの場合、outgoingTuple[fieldName]はreplacementValueに設定されます。valueは、null値を他の値に置き換えるために文字列「null」にすることができます。 -
replace(fieldName, value, withField=otherFieldName):incomingTuple[fieldName] == valueの場合、outgoingTuple[fieldName]はincomingTuple[otherFieldName]の値に設定されます。valueは、null値を他の値に置き換えるために文字列「null」にすることができます。
selectの構文
// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
teamName_s as teamName,
wins,
losses,
replace(wins,null,withValue=0),
replace(losses,null,withValue=0),
if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)
sort
sort関数はストリーミング式をラップし、タプルの順序を変更します。sort関数は、新しいソート順序ですべての入力タプルを発行します。sort関数は、入力ストリームからすべてのタプルを読み取り、O(nlog(n))のパフォーマンス特性を持つアルゴリズムを使用してそれらの順序を変更します。ここで、nは入力ストリーム内のタプルの総数です。次に、新しいソート順序でタプルを出力します。すべてのタプルがメモリに読み込まれるため、この関数のメモリ消費量は、入力ストリーム内のタプルの数に比例して増加します。
sortの構文
次の式は、犬の飼い主を見つけ、飼い主とペットの名前で結果を並べ替えます。これは、最初に人/飼い主のIDで並べ替え、次に最終出力を飼い主とペットの名前で並べ替えることにより、効率的なinnerJoinを使用していることに注意してください。
sort(
innerJoin(
search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
on="id=owner"
),
by="name asc, petName asc"
)
top
top関数はストリーミング式をラップし、タプルの順序を変更します。top関数は、新しいソート順序で上位N個のタプルのみを発行します。top関数は、基になるストリームの順序を変更するため、ソート条件は基になるストリームと一致する必要はありません。
unique
unique関数はストリーミング式をラップし、overパラメーターに基づいて一意のタプルストリームを発行します。unique関数は、基になるストリームのソート順序に依存します。overパラメーターは、基になるストリームのソート順序と一致している必要があります。
unique関数は、非コロケーションの一意のアルゴリズムを実装します。これは、同じ一意のoverフィールドを持つレコードが、同じシャードにコロケーションされている必要がないことを意味します。並列で実行する場合、partitionKeysパラメーターは、同じキーを持つレコードが同じワーカーにシャッフルされるように、一意のoverフィールドと同じである必要があります。
update
update関数は、別の関数をラップし、タプルをドキュメントとしてインデックス化するためにSolrCloudコレクションに送信します。
updateのパラメータ
-
destinationCollection: (必須)タプルがインデックス化されるコレクション。 -
batchSize: (オプション、デフォルトは250)インデックス作成のバッチサイズ。 -
pruneVersionField: (オプション、デフォルトはtrue)タプルから_version_値を削除するかどうか。 -
StreamExpression: (必須)
updateの構文
update(destinationCollection,
batchSize=500,
search(collection1,
q=*:*,
qt="/export",
fl="id,a_s,a_i,a_f,s_multi,i_multi",
sort="a_f asc, a_i asc"))
上記の例は、search関数から返されたタプルを、インデックス化されるdestinationCollectionに送信します。
この例に示すようにsearch(…)をラップすることは、このデコレータの一般的な使用法です。コレクションからドキュメントをタプルとして読み取り、何らかの方法で処理または変更してから、新しいコレクションに戻します。このため、pruneVersionField=trueがデフォルトの動作です。これは、タプルをSolrドキュメントに変換するときに内部ストリームで見つかった_version_値を削除して、オプティミスティック同時実行性の制約による予期しないエラーを防ぐためです。