メッセージ処理ループ、ツール呼び出し、ストリーミング、コンパクション
PI Embedded Agent Runtimeは、OpenClawの中核エンジンです。
@mariozechner/pi-agent-core および @mariozechner/pi-coding-agent SDKを
ラップし、プロセス内で動作するステートフルなストリーミングエージェントループを実現しています。
src/agents/pi-embedded-runner/run.ts:177-183
メッセージはセッション固有レーンとグローバルレーンの両方にキューイングされ、 セッションごとの逐次処理を保証します。4種類のレーンがあります:
| レーン | 用途 |
|---|---|
Main | メインセッションの処理 |
Cron | スケジュールタスクの処理 |
Subagent | サブエージェントの処理 |
Nested | ネストされたサブエージェント |
src/agents/pi-embedded-runner/run.ts:307-471
マルチプロファイル認証をサポートし、クールダウン/ローテーション機能があります。 あるプロファイルが失敗した場合、自動的に次のプロファイルに切り替わります。
src/agents/pi-embedded-runner/run/attempt.ts:291-329
createOpenClawCodingTools() がフルツールセットを生成します。
ツールは builtInTools と customTools に分割され、
マルチレイヤーのポリシーフィルタリングを通過します。
src/agents/pi-embedded-runner/run/attempt.ts:490-589
acquireSessionWriteLock() で並行書き込みを防止SessionManager.open(params.sessionFile) でJSONLファイルから状態を読み込みsanitizeSessionHistory() → プロバイダ固有のクリーンアップvalidateGeminiTurns() / validateAnthropicTurns() → ターン順序の検証limitHistoryTurns() → 設定に基づく履歴長の上限sanitizeToolUseResultPairing() → 孤立したツール結果の修復
activeSession.steer(text) により、実行中のストリーミングランにメッセージを注入できます。
これにより、エージェントの処理中にも追加の指示を送ることが可能です。
src/agents/pi-embedded-subscribe.ts:33-717
サブスクリプションシステムは、SDKイベントとユーザー向け出力の橋渡しを行います。
| イベント | ハンドラ | 実行モード |
|---|---|---|
message_start | handleMessageStart | 同期 |
message_update | handleMessageUpdate | 同期 |
message_end | handleMessageEnd | 同期 |
tool_execution_start | handleToolExecutionStart | 非同期 (best-effort) |
tool_execution_update | handleToolExecutionUpdate | 同期 |
tool_execution_end | handleToolExecutionEnd | 非同期 (best-effort) |
auto_compaction_start | handleAutoCompactionStart | 同期 |
auto_compaction_end | handleAutoCompactionEnd | 同期 |
assistantTexts[] — 蓄積された最終テキストtoolMetas[] — ツール呼び出しメタデータblockState — <think>/<final>タグのステートフル解析deltaBuffer / blockBuffer — ストリーミングバッファmessagingToolSentTexts[] — 重複排除追跡compactionInFlight / pendingCompactionRetry — コンパクション調整src/agents/tool-loop-detection.ts
エージェントが無限ループに陥ることを防ぐ洗練されたシステムです。 直近30回のツール呼び出しをスライディングウィンドウで追跡します。
| 検出器 | 閾値 | アクション |
|---|---|---|
| 汎用リピート検出 | 10回で警告、20回でクリティカル | 警告メッセージ注入 |
| ポーリング無進展検出 | ポーリングツールの状態変化なし | 停止促進 |
| ピンポン検出 | 交互呼び出しパターン | パターン通知 |
| グローバル回路遮断器 | 30回同一無進展呼び出し | 強制停止 |
SHA-256ハッシュ(ツール名 + 安定シリアライズされたパラメータ)でパターンマッチングを行います。
src/agents/compaction.ts、src/agents/pi-embedded-runner/compact.ts
| モード | トリガー | 詳細 |
|---|---|---|
| 自動コンパクション | SDKがコンテキスト上限接近時に発動 | auto_compaction_start/endイベントで観測 |
| 明示コンパクション | オーバーフロー回復時に呼び出し | compactEmbeddedPiSessionDirect() |
estimateMessagesTokens() + 安全マージン(1.2倍)splitMessagesByTokenShare() — 均等トークン分割chunkMessagesByMaxTokens() — 最大トークンでチャンクsummarizeInStages()
pruneHistoryForContextShare() で最も古いチャンクから削除(最大コンテキストの50%予算)単純なAPI呼び出しと異なる点:
ACTIVE_EMBEDDED_RUNS マップでアクティブランを追跡