feat: properly support streaming commands

This commit is contained in:
2025-12-09 01:24:21 +02:00
parent 9ecf9f74b7
commit 66c6599506
2 changed files with 228 additions and 85 deletions

View File

@@ -117,6 +117,7 @@ type Result struct {
func (r *Runner) Run(ctx context.Context) (Result, error) {
args := r.buildCommand()
cmd := exec.CommandContext(ctx, r.Shell, args...)
cmd.Env = append(os.Environ(), "WATCHR=1")
stdout, err := cmd.StdoutPipe()
if err != nil {
@@ -166,61 +167,140 @@ func (r *Runner) Run(ctx context.Context) (Result, error) {
return Result{Lines: lines, ExitCode: exitCode}, nil
}
// RunStreaming executes the command and streams output lines to the callback
// The callback is called for each line as it arrives
func (r *Runner) RunStreaming(ctx context.Context, lines *[]Line, mu *sync.RWMutex) error {
args := r.buildCommand()
cmd := exec.CommandContext(ctx, r.Shell, args...)
// StreamingResult holds the state of a streaming command
type StreamingResult struct {
Lines *[]Line
ExitCode int
Done bool
Error error
mu sync.RWMutex
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to create stdout pipe: %w", err)
// GetLines returns a copy of the current lines (thread-safe)
func (s *StreamingResult) GetLines() []Line {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Lines == nil {
return nil
}
result := make([]Line, len(*s.Lines))
copy(result, *s.Lines)
return result
}
// LineCount returns the current number of lines (thread-safe)
func (s *StreamingResult) LineCount() int {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Lines == nil {
return 0
}
return len(*s.Lines)
}
// IsDone returns whether the command has finished (thread-safe)
func (s *StreamingResult) IsDone() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Done
}
// RunStreaming executes the command and streams output lines in the background.
// Returns a StreamingResult that can be polled for updates.
// The command runs until ctx is cancelled or it completes naturally.
func (r *Runner) RunStreaming(ctx context.Context) *StreamingResult {
result := &StreamingResult{
Lines: &[]Line{},
ExitCode: -1,
Done: false,
}
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("failed to create stderr pipe: %w", err)
}
go func() {
args := r.buildCommand()
cmd := exec.CommandContext(ctx, r.Shell, args...)
cmd.Env = append(os.Environ(), "WATCHR=1")
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start command: %w", err)
}
lineNum := 1
// Read from both stdout and stderr concurrently
var wg sync.WaitGroup
wg.Add(2)
readPipe := func(pipe io.Reader) {
defer wg.Done()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
mu.Lock()
*lines = append(*lines, Line{
Number: lineNum,
Content: scanner.Text(),
})
lineNum++
mu.Unlock()
stdout, err := cmd.StdoutPipe()
if err != nil {
result.mu.Lock()
result.Error = fmt.Errorf("failed to create stdout pipe: %w", err)
result.Done = true
result.mu.Unlock()
return
}
}
go readPipe(stdout)
go readPipe(stderr)
stderr, err := cmd.StderrPipe()
if err != nil {
result.mu.Lock()
result.Error = fmt.Errorf("failed to create stderr pipe: %w", err)
result.Done = true
result.mu.Unlock()
return
}
wg.Wait()
if err := cmd.Start(); err != nil {
result.mu.Lock()
result.Error = fmt.Errorf("failed to start command: %w", err)
result.Done = true
result.mu.Unlock()
return
}
// Wait for command to finish (ignore exit code - we still want to show output)
_ = cmd.Wait()
lineNum := 1
var lineNumMu sync.Mutex
return nil
// Read from both stdout and stderr concurrently
var wg sync.WaitGroup
wg.Add(2)
readPipe := func(pipe io.Reader) {
defer wg.Done()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
lineNumMu.Lock()
currentLineNum := lineNum
lineNum++
lineNumMu.Unlock()
result.mu.Lock()
*result.Lines = append(*result.Lines, Line{
Number: currentLineNum,
Content: scanner.Text(),
})
result.mu.Unlock()
}
}
go readPipe(stdout)
go readPipe(stderr)
wg.Wait()
// Wait for command to finish and get exit code
exitCode := 0
if err := cmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
exitCode = exitErr.ExitCode()
} else if ctx.Err() != nil {
// Context was cancelled
exitCode = -1
}
}
result.mu.Lock()
result.ExitCode = exitCode
result.Done = true
result.mu.Unlock()
}()
return result
}
// RunSimple executes the command and returns output as string slice
func (r *Runner) RunSimple(ctx context.Context) ([]string, error) {
args := r.buildCommand()
cmd := exec.CommandContext(ctx, r.Shell, args...)
cmd.Env = append(os.Environ(), "WATCHR=1")
output, err := cmd.CombinedOutput()
if err != nil {
// Still return output even on error (non-zero exit)

View File

@@ -39,25 +39,28 @@ type Config struct {
// model represents the application state
type model struct {
config Config
lines []runner.Line
filtered []int // indices into lines that match filter
cursor int // cursor position in filtered list
offset int // scroll offset for visible window
filter string
filterMode bool
showPreview bool
showHelp bool // help overlay visible
width int
height int
runner *runner.Runner
ctx context.Context
cancel context.CancelFunc
loading bool
spinnerFrame int // current spinner animation frame
errorMsg string
statusMsg string // temporary status message (e.g., "Yanked!")
exitCode int // last command exit code
config Config
lines []runner.Line
filtered []int // indices into lines that match filter
cursor int // cursor position in filtered list
offset int // scroll offset for visible window
filter string
filterMode bool
showPreview bool
showHelp bool // help overlay visible
width int
height int
runner *runner.Runner
ctx context.Context
cancel context.CancelFunc
loading bool
streaming bool // true while command is running (streaming output)
streamResult *runner.StreamingResult // current streaming result
lastLineCount int // track line count for updates
spinnerFrame int // current spinner animation frame
errorMsg string
statusMsg string // temporary status message (e.g., "Yanked!")
exitCode int // last command exit code
}
// messages
@@ -69,6 +72,8 @@ type errMsg struct{ err error }
type tickMsg time.Time
type clearStatusMsg struct{}
type spinnerTickMsg time.Time
type streamTickMsg time.Time // periodic check for streaming updates
type startStreamMsg struct{} // trigger to start streaming
// Spinner frames for the loading animation
var spinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"}
@@ -125,8 +130,11 @@ func initialModel(cfg Config) model {
}
}
func (m model) Init() tea.Cmd {
return tea.Batch(m.runCommand(), m.spinnerTickCmd())
func (m *model) Init() tea.Cmd {
// Send a message to start streaming (handled in Update with pointer receiver)
return func() tea.Msg {
return startStreamMsg{}
}
}
func (m model) spinnerTickCmd() tea.Cmd {
@@ -135,19 +143,30 @@ func (m model) spinnerTickCmd() tea.Cmd {
})
}
func (m model) runCommand() tea.Cmd {
r := m.runner
ctx := m.ctx
return func() tea.Msg {
result, err := r.Run(ctx)
if err != nil {
return errMsg{err}
}
return resultMsg{lines: result.Lines, exitCode: result.ExitCode}
}
func (m model) streamTickCmd() tea.Cmd {
return tea.Tick(50*time.Millisecond, func(t time.Time) tea.Msg {
return streamTickMsg(t)
})
}
func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
func (m *model) startStreaming() tea.Cmd {
// Cancel any existing context and create a new one
if m.cancel != nil {
m.cancel()
}
m.ctx, m.cancel = context.WithCancel(context.Background())
m.streamResult = m.runner.RunStreaming(m.ctx)
m.streaming = true
m.loading = true
m.lastLineCount = 0
m.exitCode = -1
m.errorMsg = ""
return m.streamTickCmd()
}
func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
switch msg := msg.(type) {
case tea.KeyMsg:
return m.handleKeyPress(msg)
@@ -157,27 +176,64 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.height = msg.Height
return m, nil
case startStreamMsg:
cmd := m.startStreaming()
return m, tea.Batch(cmd, m.spinnerTickCmd())
case resultMsg:
m.lines = msg.lines
m.exitCode = msg.exitCode
m.loading = false
m.streaming = false
m.updateFiltered()
return m, nil
case streamTickMsg:
if m.streamResult == nil {
return m, nil
}
// Check for new lines
newLines := m.streamResult.GetLines()
newCount := len(newLines)
if newCount != m.lastLineCount {
m.lines = newLines
m.lastLineCount = newCount
m.updateFiltered()
}
// Check if command completed
if m.streamResult.IsDone() {
m.streaming = false
m.loading = false
m.exitCode = m.streamResult.ExitCode
if m.streamResult.Error != nil {
m.errorMsg = m.streamResult.Error.Error()
}
// If auto-refresh is enabled, schedule the next run
if m.config.RefreshSeconds > 0 {
return m, m.tickCmd()
}
return m, nil
}
// Continue streaming
return m, m.streamTickCmd()
case tickMsg:
if m.config.RefreshSeconds > 0 {
m.loading = true
return m, tea.Batch(
m.runCommand(),
m.tickCmd(),
m.spinnerTickCmd(),
)
if m.config.RefreshSeconds > 0 && !m.streaming {
// Restart streaming for refresh
cmd := m.startStreaming()
return m, tea.Batch(cmd, m.spinnerTickCmd())
}
return m, nil
case errMsg:
m.errorMsg = msg.Error()
m.loading = false
m.streaming = false
return m, nil
case clearStatusMsg:
@@ -185,7 +241,7 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
return m, nil
case spinnerTickMsg:
if m.loading {
if m.loading || m.streaming {
m.spinnerFrame = (m.spinnerFrame + 1) % len(spinnerFrames)
return m, m.spinnerTickCmd()
}
@@ -267,8 +323,9 @@ func (m *model) handleKeyPress(msg tea.KeyMsg) (tea.Model, tea.Cmd) {
m.showPreview = !m.showPreview
m.adjustOffset() // Keep selected line visible after preview toggle
case "r", "ctrl+r":
m.loading = true
return m, tea.Batch(m.runCommand(), m.spinnerTickCmd())
// Restart streaming
cmd := m.startStreaming()
return m, tea.Batch(cmd, m.spinnerTickCmd())
case "/":
m.filterMode = true
m.filter = ""
@@ -668,7 +725,7 @@ func overlayBox(base string, box string, boxWidth, boxHeight, screenWidth, scree
return strings.Join(baseLines, "\n")
}
func (m model) View() string {
func (m *model) View() string {
if m.width == 0 || m.height == 0 {
return spinnerFrames[m.spinnerFrame] + " Running command…"
}
@@ -763,6 +820,10 @@ func (m model) renderMainView() string {
var commandLine string
switch {
case m.streaming:
// Streaming - show streaming indicator
streamStyle := lipgloss.NewStyle().Foreground(lipgloss.Color("14")) // cyan
commandLine = prefix + streamStyle.Render("◉ "+m.config.Command)
case m.loading:
// Still loading - no status yet
commandLine = prefix + m.config.Command
@@ -786,7 +847,9 @@ func (m model) renderMainView() string {
default:
promptLine = promptStyle.Render(m.config.Prompt)
}
if m.loading {
if m.streaming {
promptLine += " " + spinnerFrames[m.spinnerFrame] + " Streaming…"
} else if m.loading {
promptLine += " " + spinnerFrames[m.spinnerFrame] + " Running command…"
}
if m.statusMsg != "" {
@@ -1065,7 +1128,7 @@ func Run(cfg Config) error {
}
m := initialModel(cfg)
p := tea.NewProgram(m, tea.WithAltScreen())
p := tea.NewProgram(&m, tea.WithAltScreen())
_, err := p.Run()
return err