diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 662a1df..d6c0f20 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -117,6 +117,7 @@ type Result struct { func (r *Runner) Run(ctx context.Context) (Result, error) { args := r.buildCommand() cmd := exec.CommandContext(ctx, r.Shell, args...) + cmd.Env = append(os.Environ(), "WATCHR=1") stdout, err := cmd.StdoutPipe() if err != nil { @@ -166,61 +167,140 @@ func (r *Runner) Run(ctx context.Context) (Result, error) { return Result{Lines: lines, ExitCode: exitCode}, nil } -// RunStreaming executes the command and streams output lines to the callback -// The callback is called for each line as it arrives -func (r *Runner) RunStreaming(ctx context.Context, lines *[]Line, mu *sync.RWMutex) error { - args := r.buildCommand() - cmd := exec.CommandContext(ctx, r.Shell, args...) +// StreamingResult holds the state of a streaming command +type StreamingResult struct { + Lines *[]Line + ExitCode int + Done bool + Error error + mu sync.RWMutex +} - stdout, err := cmd.StdoutPipe() - if err != nil { - return fmt.Errorf("failed to create stdout pipe: %w", err) +// GetLines returns a copy of the current lines (thread-safe) +func (s *StreamingResult) GetLines() []Line { + s.mu.RLock() + defer s.mu.RUnlock() + if s.Lines == nil { + return nil + } + result := make([]Line, len(*s.Lines)) + copy(result, *s.Lines) + return result +} + +// LineCount returns the current number of lines (thread-safe) +func (s *StreamingResult) LineCount() int { + s.mu.RLock() + defer s.mu.RUnlock() + if s.Lines == nil { + return 0 + } + return len(*s.Lines) +} + +// IsDone returns whether the command has finished (thread-safe) +func (s *StreamingResult) IsDone() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.Done +} + +// RunStreaming executes the command and streams output lines in the background. +// Returns a StreamingResult that can be polled for updates. +// The command runs until ctx is cancelled or it completes naturally. +func (r *Runner) RunStreaming(ctx context.Context) *StreamingResult { + result := &StreamingResult{ + Lines: &[]Line{}, + ExitCode: -1, + Done: false, } - stderr, err := cmd.StderrPipe() - if err != nil { - return fmt.Errorf("failed to create stderr pipe: %w", err) - } + go func() { + args := r.buildCommand() + cmd := exec.CommandContext(ctx, r.Shell, args...) + cmd.Env = append(os.Environ(), "WATCHR=1") - if err := cmd.Start(); err != nil { - return fmt.Errorf("failed to start command: %w", err) - } - - lineNum := 1 - - // Read from both stdout and stderr concurrently - var wg sync.WaitGroup - wg.Add(2) - - readPipe := func(pipe io.Reader) { - defer wg.Done() - scanner := bufio.NewScanner(pipe) - for scanner.Scan() { - mu.Lock() - *lines = append(*lines, Line{ - Number: lineNum, - Content: scanner.Text(), - }) - lineNum++ - mu.Unlock() + stdout, err := cmd.StdoutPipe() + if err != nil { + result.mu.Lock() + result.Error = fmt.Errorf("failed to create stdout pipe: %w", err) + result.Done = true + result.mu.Unlock() + return } - } - go readPipe(stdout) - go readPipe(stderr) + stderr, err := cmd.StderrPipe() + if err != nil { + result.mu.Lock() + result.Error = fmt.Errorf("failed to create stderr pipe: %w", err) + result.Done = true + result.mu.Unlock() + return + } - wg.Wait() + if err := cmd.Start(); err != nil { + result.mu.Lock() + result.Error = fmt.Errorf("failed to start command: %w", err) + result.Done = true + result.mu.Unlock() + return + } - // Wait for command to finish (ignore exit code - we still want to show output) - _ = cmd.Wait() + lineNum := 1 + var lineNumMu sync.Mutex - return nil + // Read from both stdout and stderr concurrently + var wg sync.WaitGroup + wg.Add(2) + + readPipe := func(pipe io.Reader) { + defer wg.Done() + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + lineNumMu.Lock() + currentLineNum := lineNum + lineNum++ + lineNumMu.Unlock() + + result.mu.Lock() + *result.Lines = append(*result.Lines, Line{ + Number: currentLineNum, + Content: scanner.Text(), + }) + result.mu.Unlock() + } + } + + go readPipe(stdout) + go readPipe(stderr) + + wg.Wait() + + // Wait for command to finish and get exit code + exitCode := 0 + if err := cmd.Wait(); err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } else if ctx.Err() != nil { + // Context was cancelled + exitCode = -1 + } + } + + result.mu.Lock() + result.ExitCode = exitCode + result.Done = true + result.mu.Unlock() + }() + + return result } // RunSimple executes the command and returns output as string slice func (r *Runner) RunSimple(ctx context.Context) ([]string, error) { args := r.buildCommand() cmd := exec.CommandContext(ctx, r.Shell, args...) + cmd.Env = append(os.Environ(), "WATCHR=1") output, err := cmd.CombinedOutput() if err != nil { // Still return output even on error (non-zero exit) diff --git a/internal/ui/ui.go b/internal/ui/ui.go index f69980b..bb62ea4 100644 --- a/internal/ui/ui.go +++ b/internal/ui/ui.go @@ -39,25 +39,28 @@ type Config struct { // model represents the application state type model struct { - config Config - lines []runner.Line - filtered []int // indices into lines that match filter - cursor int // cursor position in filtered list - offset int // scroll offset for visible window - filter string - filterMode bool - showPreview bool - showHelp bool // help overlay visible - width int - height int - runner *runner.Runner - ctx context.Context - cancel context.CancelFunc - loading bool - spinnerFrame int // current spinner animation frame - errorMsg string - statusMsg string // temporary status message (e.g., "Yanked!") - exitCode int // last command exit code + config Config + lines []runner.Line + filtered []int // indices into lines that match filter + cursor int // cursor position in filtered list + offset int // scroll offset for visible window + filter string + filterMode bool + showPreview bool + showHelp bool // help overlay visible + width int + height int + runner *runner.Runner + ctx context.Context + cancel context.CancelFunc + loading bool + streaming bool // true while command is running (streaming output) + streamResult *runner.StreamingResult // current streaming result + lastLineCount int // track line count for updates + spinnerFrame int // current spinner animation frame + errorMsg string + statusMsg string // temporary status message (e.g., "Yanked!") + exitCode int // last command exit code } // messages @@ -69,6 +72,8 @@ type errMsg struct{ err error } type tickMsg time.Time type clearStatusMsg struct{} type spinnerTickMsg time.Time +type streamTickMsg time.Time // periodic check for streaming updates +type startStreamMsg struct{} // trigger to start streaming // Spinner frames for the loading animation var spinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"} @@ -125,8 +130,11 @@ func initialModel(cfg Config) model { } } -func (m model) Init() tea.Cmd { - return tea.Batch(m.runCommand(), m.spinnerTickCmd()) +func (m *model) Init() tea.Cmd { + // Send a message to start streaming (handled in Update with pointer receiver) + return func() tea.Msg { + return startStreamMsg{} + } } func (m model) spinnerTickCmd() tea.Cmd { @@ -135,19 +143,30 @@ func (m model) spinnerTickCmd() tea.Cmd { }) } -func (m model) runCommand() tea.Cmd { - r := m.runner - ctx := m.ctx - return func() tea.Msg { - result, err := r.Run(ctx) - if err != nil { - return errMsg{err} - } - return resultMsg{lines: result.Lines, exitCode: result.ExitCode} - } +func (m model) streamTickCmd() tea.Cmd { + return tea.Tick(50*time.Millisecond, func(t time.Time) tea.Msg { + return streamTickMsg(t) + }) } -func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { +func (m *model) startStreaming() tea.Cmd { + // Cancel any existing context and create a new one + if m.cancel != nil { + m.cancel() + } + m.ctx, m.cancel = context.WithCancel(context.Background()) + + m.streamResult = m.runner.RunStreaming(m.ctx) + m.streaming = true + m.loading = true + m.lastLineCount = 0 + m.exitCode = -1 + m.errorMsg = "" + + return m.streamTickCmd() +} + +func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { switch msg := msg.(type) { case tea.KeyMsg: return m.handleKeyPress(msg) @@ -157,27 +176,64 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.height = msg.Height return m, nil + case startStreamMsg: + cmd := m.startStreaming() + return m, tea.Batch(cmd, m.spinnerTickCmd()) + case resultMsg: m.lines = msg.lines m.exitCode = msg.exitCode m.loading = false + m.streaming = false m.updateFiltered() return m, nil + case streamTickMsg: + if m.streamResult == nil { + return m, nil + } + + // Check for new lines + newLines := m.streamResult.GetLines() + newCount := len(newLines) + + if newCount != m.lastLineCount { + m.lines = newLines + m.lastLineCount = newCount + m.updateFiltered() + } + + // Check if command completed + if m.streamResult.IsDone() { + m.streaming = false + m.loading = false + m.exitCode = m.streamResult.ExitCode + if m.streamResult.Error != nil { + m.errorMsg = m.streamResult.Error.Error() + } + + // If auto-refresh is enabled, schedule the next run + if m.config.RefreshSeconds > 0 { + return m, m.tickCmd() + } + return m, nil + } + + // Continue streaming + return m, m.streamTickCmd() + case tickMsg: - if m.config.RefreshSeconds > 0 { - m.loading = true - return m, tea.Batch( - m.runCommand(), - m.tickCmd(), - m.spinnerTickCmd(), - ) + if m.config.RefreshSeconds > 0 && !m.streaming { + // Restart streaming for refresh + cmd := m.startStreaming() + return m, tea.Batch(cmd, m.spinnerTickCmd()) } return m, nil case errMsg: m.errorMsg = msg.Error() m.loading = false + m.streaming = false return m, nil case clearStatusMsg: @@ -185,7 +241,7 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, nil case spinnerTickMsg: - if m.loading { + if m.loading || m.streaming { m.spinnerFrame = (m.spinnerFrame + 1) % len(spinnerFrames) return m, m.spinnerTickCmd() } @@ -267,8 +323,9 @@ func (m *model) handleKeyPress(msg tea.KeyMsg) (tea.Model, tea.Cmd) { m.showPreview = !m.showPreview m.adjustOffset() // Keep selected line visible after preview toggle case "r", "ctrl+r": - m.loading = true - return m, tea.Batch(m.runCommand(), m.spinnerTickCmd()) + // Restart streaming + cmd := m.startStreaming() + return m, tea.Batch(cmd, m.spinnerTickCmd()) case "/": m.filterMode = true m.filter = "" @@ -668,7 +725,7 @@ func overlayBox(base string, box string, boxWidth, boxHeight, screenWidth, scree return strings.Join(baseLines, "\n") } -func (m model) View() string { +func (m *model) View() string { if m.width == 0 || m.height == 0 { return spinnerFrames[m.spinnerFrame] + " Running command…" } @@ -763,6 +820,10 @@ func (m model) renderMainView() string { var commandLine string switch { + case m.streaming: + // Streaming - show streaming indicator + streamStyle := lipgloss.NewStyle().Foreground(lipgloss.Color("14")) // cyan + commandLine = prefix + streamStyle.Render("◉ "+m.config.Command) case m.loading: // Still loading - no status yet commandLine = prefix + m.config.Command @@ -786,7 +847,9 @@ func (m model) renderMainView() string { default: promptLine = promptStyle.Render(m.config.Prompt) } - if m.loading { + if m.streaming { + promptLine += " " + spinnerFrames[m.spinnerFrame] + " Streaming…" + } else if m.loading { promptLine += " " + spinnerFrames[m.spinnerFrame] + " Running command…" } if m.statusMsg != "" { @@ -1065,7 +1128,7 @@ func Run(cfg Config) error { } m := initialModel(cfg) - p := tea.NewProgram(m, tea.WithAltScreen()) + p := tea.NewProgram(&m, tea.WithAltScreen()) _, err := p.Run() return err