package main import ( "bufio" "encoding/json" "io" "os" "path/filepath" "sort" "strings" "sync" "github.com/fsnotify/fsnotify" ) // EventMsg is sent to the Bubble Tea model when a new event is parsed. type EventMsg struct { Event Event File string } // InitialEventsMsg is sent once with all tail events at startup. type InitialEventsMsg struct { Events []Event } // NewFileMsg is sent when a new JSONL file is discovered. type NewFileMsg struct { File string } // WatcherErrorMsg is sent when the watcher encounters an error. type WatcherErrorMsg struct { Err error } // Default number of lines to read from the end of each file on startup. const defaultTailLines = 200 // Watcher watches a directory for JSONL files and tails them. type Watcher struct { dir string eventCh chan EventMsg done chan struct{} // Initial batch of events from tail, sent once initialEvents []Event initialDone chan struct{} mu sync.Mutex tailing map[string]struct{} fsw *fsnotify.Watcher } // NewWatcher creates a new directory watcher. func NewWatcher(dir string) *Watcher { return &Watcher{ dir: dir, eventCh: make(chan EventMsg, 256), done: make(chan struct{}), initialDone: make(chan struct{}), tailing: make(map[string]struct{}), } } // Start begins watching and tailing. func (w *Watcher) Start() error { var err error w.fsw, err = fsnotify.NewWatcher() if err != nil { return err } if err := w.fsw.Add(w.dir); err != nil { w.fsw.Close() return err } // Read tail of existing files synchronously to build initial batch matches, _ := filepath.Glob(filepath.Join(w.dir, "*.jsonl")) var allInitial []Event fileOffsets := make(map[string]int64) // track where we left off for _, path := range matches { events, endOffset := readTail(path, defaultTailLines) allInitial = append(allInitial, events...) fileOffsets[path] = endOffset } // Sort by timestamp sort.Slice(allInitial, func(i, j int) bool { return allInitial[i].parsed.time.Before(allInitial[j].parsed.time) }) w.initialEvents = allInitial // Now start tailing from where we left off (for new events only) for _, path := range matches { w.mu.Lock() w.tailing[path] = struct{}{} w.mu.Unlock() go w.tailFileFrom(path, fileOffsets[path]) } close(w.initialDone) // Watch for new files go w.watchLoop() return nil } // InitialEvents returns the batch of events read from tails at startup. // Only valid after Start() returns. func (w *Watcher) InitialEvents() []Event { return w.initialEvents } // readTail reads the last n lines from a JSONL file and returns parsed events // plus the byte offset at EOF (for continuing to tail from). func readTail(path string, n int) ([]Event, int64) { f, err := os.Open(path) if err != nil { return nil, 0 } defer f.Close() offset := findTailOffset(f, n) if _, err := f.Seek(offset, io.SeekStart); err != nil { return nil, 0 } reader := bufio.NewReader(f) // If we seeked to mid-file, skip the first partial line if offset > 0 { _, _ = reader.ReadString('\n') } var events []Event for { line, err := reader.ReadString('\n') if err != nil { break } line = strings.TrimSpace(line) if line == "" { continue } var evt Event if err := json.Unmarshal([]byte(line), &evt); err != nil { continue } parseEvent(&evt) events = append(events, evt) } // Get current file position (EOF) endOffset, err := f.Seek(0, io.SeekEnd) if err != nil { endOffset = 0 } return events, endOffset } func (w *Watcher) watchLoop() { defer w.fsw.Close() for { select { case <-w.done: return case event, ok := <-w.fsw.Events: if !ok { return } if event.Op&(fsnotify.Create|fsnotify.Write) != 0 { if strings.HasSuffix(event.Name, ".jsonl") { w.startTail(event.Name) } } case _, ok := <-w.fsw.Errors: if !ok { return } } } } func (w *Watcher) startTail(path string) { w.mu.Lock() if _, ok := w.tailing[path]; ok { w.mu.Unlock() return } w.tailing[path] = struct{}{} w.mu.Unlock() // New file discovered after startup — tail from beginning go w.tailFileFrom(path, 0) } // tailFileFrom tails a file starting from the given byte offset. func (w *Watcher) tailFileFrom(path string, startOffset int64) { f, err := os.Open(path) if err != nil { return } defer f.Close() if startOffset > 0 { if _, err := f.Seek(startOffset, io.SeekStart); err != nil { return } } basename := filepath.Base(path) reader := bufio.NewReader(f) for { select { case <-w.done: return default: } line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { w.waitForChange(path, f, reader) continue } return } line = strings.TrimSpace(line) if line == "" { continue } var evt Event if err := json.Unmarshal([]byte(line), &evt); err != nil { continue } parseEvent(&evt) select { case w.eventCh <- EventMsg{Event: evt, File: basename}: case <-w.done: return } } } // findTailOffset returns a byte offset to start reading from to get // approximately the last n lines of the file. func findTailOffset(f *os.File, n int) int64 { info, err := f.Stat() if err != nil { return 0 } size := info.Size() if size == 0 { return 0 } bufSize := int64(8192) newlines := 0 pos := size buf := make([]byte, bufSize) for pos > 0 { readSize := bufSize if pos < readSize { readSize = pos } pos -= readSize nRead, err := f.ReadAt(buf[:readSize], pos) if err != nil && err != io.EOF { return 0 } for i := nRead - 1; i >= 0; i-- { if buf[i] == '\n' { newlines++ if newlines > n { return pos + int64(i) + 1 } } } } return 0 } // waitForChange blocks until the file has new data. func (w *Watcher) waitForChange(path string, f *os.File, reader *bufio.Reader) { fw, err := fsnotify.NewWatcher() if err != nil { return } defer fw.Close() if err := fw.Add(path); err != nil { return } for { select { case <-w.done: return case evt, ok := <-fw.Events: if !ok { return } if evt.Op&fsnotify.Write != 0 { return } case <-fw.Errors: return } } } // Stop stops the watcher. func (w *Watcher) Stop() { close(w.done) } // Events returns the channel that emits parsed events. func (w *Watcher) Events() <-chan EventMsg { return w.eventCh }