Loading...
Loading...
AI agent development standards using golanggraph for graph-based workflows, langchaingo for LLM calls, tool integration, MCP, and LLM best practices (context compression, prompt caching, attention raising, tool response trimming).
npx skill4agent add yet-another-ai-project/kiwi-skills kiwi-go-agent| Framework | Purpose | Import |
|---|---|---|
| golanggraph | Graph-based agent workflow engine | |
| langchaingo | LLM calls, prompts, tool definitions | |
state.Statetype State struct {
History []llms.MessageContent // Conversation messages (system, human, AI, tool)
Metadata map[string]interface{} // Custom data shared between nodes
}HistoryMetadataflowcontract.Nodetype Node interface {
Name() string
Run(ctx context.Context, currentState *state.State, streamFunc flowcontract.StreamFunc) error
}Name()Run()currentStatestreamFuncstate.States.GetLastResponse() string // Last AI message text
s.GetResumeValue() interface{} // Value passed via ResumeWithValue()
s.IsInterrupted() bool // Whether flow was interrupted
s.GetThreadID() string // Current thread ID (for checkpointing)
s.SetInterruptPayload(payload) // Set interrupt payload before Interrupt()// Direct edge: always flows From -> To
edge.Edge{From: "node_a", To: "node_b"}
// Conditional edge: ConditionFunc decides which target
edge.Edge{
From: "node_a",
ConditionalTo: []string{"node_b", "node_c", flow.EndNode},
ConditionFunc: func(ctx context.Context, st state.State) (string, error) {
// Return one of ConditionalTo values
return "node_b", nil
},
}compiledFlow, err := flow.NewFlowBuilder(logger).
SetName("my_agent").
SetCheckpointer(checkpointer.NewInMemoryCheckpointer()).
AddNode(nodeA).
AddNode(nodeB).
AddEdge(edge.Edge{From: flow.StartNode, To: nodeA.Name()}).
AddEdge(edge.Edge{From: nodeA.Name(), To: nodeB.Name()}).
AddEdge(edge.Edge{From: nodeB.Name(), To: flow.EndNode}).
Compile()// Without streaming
finalState, err := compiledFlow.Exec(ctx, initialState, nil)
// With streaming callback
finalState, err := compiledFlow.Exec(ctx, initialState, func(ctx context.Context, event *flowcontract.FlowStreamEvent) error {
// event.Chunk contains streamed data
// event.FullState contains current flow state
return nil
})agent.NewAgent()a, err := agent.NewAgent(
agent.WithName("my_agent"),
agent.WithModel(llm),
agent.WithTools([]tools.ITool{searchTool, fileTool}),
agent.WithMaxToolCalls(10), // Prevent infinite loops (default: 10)
agent.WithContextWindow(20), // Auto-compress history, keep last 20 msgs
agent.WithResponseValidator(validatorFunc), // Optional: validate LLM output
agent.WithSubAgent("researcher", researcherAgent), // Optional: delegate to sub-agents
agent.WithBeforeModelHook(beforeHook), // Optional: run before LLM call
agent.WithAfterModelHook(afterHook), // Optional: run after LLM call
agent.WithBeforeToolsHook(beforeToolsHook), // Optional: run before tool execution
agent.WithAfterToolsHook(afterToolsHook), // Optional: run after tool execution
agent.WithLogger(logger),
)flowcontract.Node// Standalone execution
finalState, err := a.Run(ctx, &initialState, streamFunc)
// Or as a node in a flow
flow.NewFlowBuilder(logger).AddNode(a)...ChatNodeToolsNodeToolConditiontools.NewToolsmodel.NewModelNodeSTART -> CustomGenNode -> (has tool calls?) -> ToolsNode -> CustomGenNode (loop)
-> (no tool calls) -> ValidationNode -> ENDtoolcondition.NewToolCondition()model.NewModelNodechat.NewChatNodeagent.WithSubAgent()delegate_taskresearcher, _ := agent.NewAgent(agent.WithName("researcher"), agent.WithModel(llm), agent.WithTools(researchTools))
writer, _ := agent.NewAgent(agent.WithName("writer"), agent.WithModel(llm))
orchestrator, _ := agent.NewAgent(
agent.WithName("orchestrator"),
agent.WithModel(llm),
agent.WithSubAgent("researcher", researcher),
agent.WithSubAgent("writer", writer),
)delegate_taskagent_nametaskSTART -> GenerationNode -> ValidationNode -> (has errors?) -> FixNode -> ValidationNode (loop)
-> (valid?) -> END| Component | Import | Purpose |
|---|---|---|
| | RECOMMENDED All-in-one ReAct agent with tools, hooks, validation, delegation |
| | Generic LLM model node (replaces deprecated |
| | Executes tool calls from history |
| | Conditional edge: routes to tools or next node |
| | Creates langchaingo LLM instance |
| | In-memory state checkpointing |
| | Redis-backed state checkpointing (for production) |
tools.ITooltype ITool interface {
Tools(ctx context.Context) []llms.Tool // Return tool definitions
Run(ctx context.Context, toolCall llms.ToolCall) (llms.ToolCallResponse, error) // Execute a single tool call
}Runllms.ToolCallllms.ToolCallResponseToolsNodefunc (t *MyTool) Tools(ctx context.Context) []llms.Tool {
return []llms.Tool{{
Type: "function",
Function: &llms.FunctionDefinition{
Name: "my_tool",
Description: "What this tool does",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"query": map[string]any{"type": "string", "description": "Search query"},
},
"required": []string{"query"},
},
},
}}
}func (t *MyTool) Run(ctx context.Context, toolCall llms.ToolCall) (llms.ToolCallResponse, error) {
// Parse arguments from the tool call
var args struct {
Query string `json:"query"`
}
if err := json.Unmarshal([]byte(toolCall.FunctionCall.Arguments), &args); err != nil {
return llms.ToolCallResponse{}, xerror.Wrap(err)
}
// Execute tool logic
result, err := t.execute(ctx, args.Query)
if err != nil {
// Return error as tool response (don't fail the agent)
return llms.ToolCallResponse{
ToolCallID: toolCall.ID,
Name: toolCall.FunctionCall.Name,
Content: "Error: " + err.Error(),
}, nil
}
return llms.ToolCallResponse{
ToolCallID: toolCall.ID,
Name: toolCall.FunctionCall.Name,
Content: result,
}, nil
}llms.Model.GenerateContentcompletion, err := llm.GenerateContent(
ctx,
messages, // []llms.MessageContent
llms.WithTemperature(0.7), // Creativity control
llms.WithTools(toolDefinitions), // Optional tool definitions
llms.WithMaxTokens(4096), // Optional max tokens
)
if err != nil { return xerror.Wrap(err) }
choice := completion.Choices[0]
// choice.Content = text response
// choice.ToolCalls = tool calls (if any)ctx = context.WithValue(ctx, utils.OverrideModelKey, config.SpecificModel)
completion, err := llm.GenerateContent(ctx, messages, ...)//go:embed//go:embed prompt.txt
var promptTemplate stringprompts.NewPromptTemplatetmpl := prompts.NewPromptTemplate(promptTemplate, []string{"var1", "var2"})
formatted, err := tmpl.Format(map[string]any{"var1": "value1", "var2": "value2"})const MetadataKeyMyState = "my_agent_state"
type MyAgentState struct {
CurrentStep int
Results []Result
// Keep separate histories for multi-agent
AgentAHistory []llms.MessageContent `json:"-"` // Exclude from serialization
}
func getState(st *state.State) *MyAgentState {
if st.Metadata == nil { st.Metadata = make(map[string]interface{}) }
if v, ok := st.Metadata[MetadataKeyMyState]; ok {
if s, ok := v.(*MyAgentState); ok { return s }
}
s := &MyAgentState{}
st.Metadata[MetadataKeyMyState] = s
return s
}state.Historystate.Historytype MultiAgentState struct {
DirectorHistory []llms.MessageContent // Director's conversation
CharacterHistory map[string][]llms.MessageContent // Per-character conversations
}Prebuilt Agent: Useto enable automatic context compression. The agent's built-inagent.WithContextWindow(N)preserves system messages and keeps the last N non-system messages. Manual trimming is only needed for custom flows.contextCompressHook
func (s *MyState) TrimHistory() {
const maxLen = 10
const keepRecent = 5
if len(s.History) <= maxLen { return }
preserved := []llms.MessageContent{s.History[0]} // Keep system prompt
// Optionally keep important anchors (e.g., task definition)
startIdx := len(s.History) - keepRecent
preserved = append(preserved, s.History[startIdx:]...)
s.History = preserved
}const maxContentChars = 500
const maxResultsInResponse = 4
// In tool execution:
if len([]rune(content)) > maxContentChars {
content = string([]rune(content)[:maxContentChars]) + "...[content truncated]"
}
// Store full results in Metadata for other nodes
currentState.Metadata["full_results"] = fullResultsfunc trimToolResponsesInHistory(history []llms.MessageContent) {
for i, msg := range history {
if msg.Role != llms.ChatMessageTypeTool { continue }
for j, part := range msg.Parts {
resp, ok := part.(llms.ToolCallResponse)
if !ok { continue }
if len([]rune(resp.Content)) > maxToolResponseChars {
resp.Content = string([]rune(resp.Content)[:maxToolResponseChars]) + "\n...[truncated]"
msg.Parts[j] = resp
}
}
history[i] = msg
}
}[SYSTEM MESSAGE - Static, cacheable]
- Role definition
- Rules and constraints
- Output format specification
- Few-shot examples
[HUMAN MESSAGE - Dynamic, per-request]
- Current task/input
- Context-specific instructions// XML tags for structure
prompt := `<rules>
CRITICAL: You MUST respond in valid JSON format.
</rules>
<context>
... long context here ...
</context>
<task>
Generate the output based on the rules above.
</task>`
// Ephemeral reminders (appended to messages but NOT saved to history)
messagesForCall := append(history, llms.MessageContent{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{llms.TextContent{Text: "REMINDER: Respond in valid JSON only."}},
})<rules><context><output>IMPORTANT:CRITICAL:MUSTToolCallResponsetools.IToolflowcontract.Interrupt()Run()func (n *ApprovalNode) Run(ctx context.Context, currentState *state.State, _ flowcontract.StreamFunc) error {
// Prepare payload describing what approval is needed
currentState.SetInterruptPayload(map[string]any{
"question": "Do you approve this action?",
"details": actionDetails,
})
return flowcontract.Interrupt(currentState.Metadata["interrupt_payload"])
}finalState, err := compiledFlow.Exec(ctx, initialState, streamFunc)
if interruptErr, ok := flowcontract.IsInterrupt(err); ok {
// Flow paused — interruptErr.Payload contains the interrupt payload
// Present to user, collect input, then resume
}finalState, err := compiledFlow.ResumeWithValue(ctx, threadID, userResponse, streamFunc)state.GetResumeValue()func (n *ApprovalNode) Run(ctx context.Context, currentState *state.State, _ flowcontract.StreamFunc) error {
// Check if we're resuming from an interrupt
if resumeValue := currentState.GetResumeValue(); resumeValue != nil {
approval := resumeValue.(string)
if approval == "approved" {
// Proceed with the action
return nil
}
// Handle rejection
return xerror.New("action rejected by user")
}
// First visit — interrupt for approval
currentState.SetInterruptPayload(map[string]any{"question": "Approve?"})
return flowcontract.Interrupt(currentState.Metadata["interrupt_payload"])
}Checkpointercheckpointer.NewRedisCheckpointer()checkpointer.NewInMemoryCheckpointer()threadIDtype MyChain struct {
llm llms.Model
memory schema.Memory
}
func (c *MyChain) Call(ctx context.Context, inputs map[string]any, options ...chains.ChainCallOption) (map[string]any, error) {
// Format prompt from template
// Call LLM
// Parse and return output
}
func (c *MyChain) GetMemory() schema.Memory { return c.memory }
func (c *MyChain) GetInputKeys() []string { return []string{"input_key"} }
func (c *MyChain) GetOutputKeys() []string { return []string{"output_key"} }flow.Exec// In domain service constructor
overviewAgent, err := overviewagent.NewOverviewAgent(
overviewagent.WithLogger(logger),
overviewagent.WithLLM(llm),
overviewagent.WithConfig(config.LLM),
)
// In domain service method
initialState := state.State{
Metadata: map[string]interface{}{
overviewagent.MetadataKeyState: myState,
},
History: []llms.MessageContent{},
}
finalState, err := s.overviewAgent.Exec(ctx, initialState, streamCallback)
// Extract results from finalState.Metadata
result := finalState.Metadata[overviewagent.MetadataKeyState].(*MyState)Preferfor standard agents. The functional options pattern below is for custom flows or wrapping the prebuilt agent in a domain-specific factory.agent.NewAgent()
type Opt struct {
logger logger.ILogger
llm llms.Model
config *config.LLMConfig
}
type Option func(*Opt)
func WithLogger(l logger.ILogger) Option { return func(o *Opt) { o.logger = l } }
func WithLLM(l llms.Model) Option { return func(o *Opt) { o.llm = l } }
func WithConfig(c *config.LLMConfig) Option { return func(o *Opt) { o.config = c } }
func NewMyAgent(options ...Option) (*flow.Flow, error) {
opts := &Opt{}
for _, o := range options { o(opts) }
// Validate required options
if opts.logger == nil { return nil, xerror.New("logger is required") }
if opts.llm == nil { return nil, xerror.New("llm is required") }
// Build and compile flow
return flow.NewFlowBuilder(opts.logger).
SetName("my_agent").
// ... AddNode, AddEdge ...
Compile()
}import (
// golanggraph core
"github.com/futurxlab/golanggraph/flow" // flow.NewFlowBuilder, flow.Flow, flow.StartNode, flow.EndNode
"github.com/futurxlab/golanggraph/edge" // edge.Edge
"github.com/futurxlab/golanggraph/state" // state.State
"github.com/futurxlab/golanggraph/checkpointer" // checkpointer.NewInMemoryCheckpointer(), NewRedisCheckpointer()
flowcontract "github.com/futurxlab/golanggraph/contract" // StreamFunc, FlowStreamEvent, Node, Interrupt(), IsInterrupt()
// kiwi-lib (shared utilities — moved from golanggraph)
"github.com/Yet-Another-AI-Project/kiwi-lib/logger" // logger.ILogger
"github.com/Yet-Another-AI-Project/kiwi-lib/xerror" // xerror.Wrap, xerror.New
// golanggraph prebuilt
"github.com/futurxlab/golanggraph/prebuilt/agent" // agent.NewAgent (RECOMMENDED)
"github.com/futurxlab/golanggraph/prebuilt/node/model" // model.NewModelNode (replaces chat.NewChatNode)
"github.com/futurxlab/golanggraph/prebuilt/node/tools" // tools.NewTools, tools.ITool
"github.com/futurxlab/golanggraph/prebuilt/edge/toolcondition" // toolcondition.NewToolCondition
// langchaingo
"github.com/tmc/langchaingo/llms" // llms.Model, MessageContent, Tool, ToolCall
"github.com/tmc/langchaingo/prompts" // prompts.NewPromptTemplate
"github.com/tmc/langchaingo/chains" // chains (for chain pattern)
"github.com/tmc/langchaingo/memory" // memory.NewSimple()
"github.com/tmc/langchaingo/schema" // schema.Memory
)// agent/utils/tool_utils.go
func HasToolCalls(choice *llms.ContentChoice) bool
func HasToolCallsInHistory(st state.State) bool
func CreateToolCallMessage(choice *llms.ContentChoice) llms.MessageContent
// Reusable format reminders
var JsonReminder = llms.MessageContent{
Role: llms.ChatMessageTypeHuman,
Parts: []llms.ContentPart{llms.TextContent{Text: "IMPORTANT: Respond ONLY with valid JSON."}},
}agent.NewAgent()Name()Run()flowcontract.Nodereturn errxerror.Wrap(err)kiwi-lib/xerrorflow.StartNodeflow.EndNodeWithContextWindow()Run()Run(ctx, llms.ToolCall) (llms.ToolCallResponse, error)//go:embedConditionalToagent.WithMaxToolCalls()streamFuncCheckpointerloggerxerrorkiwi-libgolanggraph