Skip to main content

Real-time Updates

WebSocket-based real-time data synchronization with Cocobase.

Table of Contents

Overview

Cocobase supports real-time updates via WebSocket connections. When documents in a collection change, your application receives instant notifications.

Features

  • Live Updates: Receive notifications when documents are created, updated, or deleted
  • Event-Driven: React to changes with callback functions
  • Multiple Connections: Watch multiple collections simultaneously
  • Named Connections: Track connections with custom names
  • Automatic Reconnection: Built-in connection management

Event Types

  • create: New document created
  • update: Document updated
  • delete: Document deleted

Basic Usage

Watch a Collection

package main

import (
"context"
"fmt"
"log"

"github.com/lordace-coder/cocobase-go/cocobase"
)

func main() {
client := cocobase.NewClient(cocobase.Config{
APIKey: "your-api-key",
})

ctx := context.Background()

// Start watching collection
conn, err := client.WatchCollection(ctx, "users", func(event cocobase.Event) {
fmt.Printf("Event: %s\n", event.Event)
fmt.Printf("Document ID: %s\n", event.Data.ID)
fmt.Printf("Data: %+v\n", event.Data.Data)
}, "users-watcher")

if err != nil {
log.Fatal(err)
}
defer conn.Close()

// Keep application running
select {}
}

Event Structure

type Event struct {
Event string `json:"event"` // "create", "update", or "delete"
Data Document `json:"data"` // The affected document
}

Document in Event

conn, err := client.WatchCollection(ctx, "users", func(event cocobase.Event) {
// Access event type
fmt.Printf("Event Type: %s\n", event.Event)

// Access document fields
fmt.Printf("Document ID: %s\n", event.Data.ID)
fmt.Printf("Collection: %s\n", event.Data.Collection)
fmt.Printf("Created: %v\n", event.Data.CreatedAt)
fmt.Printf("Updated: %v\n", event.Data.UpdatedAt)

// Access document data
if name, ok := event.Data.Data["name"].(string); ok {
fmt.Printf("Name: %s\n", name)
}
}, "")

Connection Management

Creating Named Connections

// Named connection for tracking
conn, err := client.WatchCollection(ctx, "users", handler, "users-watcher")

// Anonymous connection (auto-generated name)
conn, err := client.WatchCollection(ctx, "orders", handler, "")

Closing Connections

conn, err := client.WatchCollection(ctx, "users", handler, "watcher")
if err != nil {
log.Fatal(err)
}

// Close when done
defer conn.Close()

// Or close explicitly
err = conn.Close()
if err != nil {
log.Printf("Error closing connection: %v\n", err)
}

Check Connection Status

conn, err := client.WatchCollection(ctx, "users", handler, "watcher")
if err != nil {
log.Fatal(err)
}

// Check if connection is closed
if conn.IsClosed() {
fmt.Println("Connection is closed")
} else {
fmt.Println("Connection is active")
}

Multiple Connections

// Watch multiple collections
usersConn, err := client.WatchCollection(ctx, "users", userHandler, "users")
if err != nil {
log.Fatal(err)
}
defer usersConn.Close()

ordersConn, err := client.WatchCollection(ctx, "orders", orderHandler, "orders")
if err != nil {
log.Fatal(err)
}
defer ordersConn.Close()

productsConn, err := client.WatchCollection(ctx, "products", productHandler, "products")
if err != nil {
log.Fatal(err)
}
defer productsConn.Close()

// Keep running
select {}

Event Handling

Handling Different Event Types

handler := func(event cocobase.Event) {
switch event.Event {
case "create":
handleCreate(event.Data)
case "update":
handleUpdate(event.Data)
case "delete":
handleDelete(event.Data)
default:
fmt.Printf("Unknown event: %s\n", event.Event)
}
}

conn, err := client.WatchCollection(ctx, "users", handler, "watcher")

Type-Specific Handlers

func handleCreate(doc cocobase.Document) {
fmt.Printf("New document created: %s\n", doc.ID)

if name, ok := doc.Data["name"].(string); ok {
fmt.Printf("Welcome, %s!\n", name)
}
}

func handleUpdate(doc cocobase.Document) {
fmt.Printf("Document updated: %s\n", doc.ID)

// Check what changed
if status, ok := doc.Data["status"].(string); ok {
fmt.Printf("New status: %s\n", status)
}
}

func handleDelete(doc cocobase.Document) {
fmt.Printf("Document deleted: %s\n", doc.ID)
}

Filtering Events

handler := func(event cocobase.Event) {
// Only handle events for specific documents
status, ok := event.Data.Data["status"].(string)
if !ok || status != "active" {
return
}

// Process active documents only
fmt.Printf("Active document %s: %s\n", event.Event, event.Data.ID)
}

conn, err := client.WatchCollection(ctx, "users", handler, "active-users")

UI Updates

handler := func(event cocobase.Event) {
switch event.Event {
case "create":
// Add to UI list
addToList(event.Data)

case "update":
// Update UI element
updateInList(event.Data)

case "delete":
// Remove from UI
removeFromList(event.Data.ID)
}
}

conn, err := client.WatchCollection(ctx, "tasks", handler, "tasks-ui")

Error Handling

Connection Errors

conn, err := client.WatchCollection(ctx, "users", handler, "watcher")
if err != nil {
// Check error type
fmt.Printf("Failed to establish connection: %v\n", err)

// Try alternative approach
// ...
}

Event Processing Errors

handler := func(event cocobase.Event) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic in event handler: %v\n", r)
}
}()

// Process event
processEvent(event)
}

conn, err := client.WatchCollection(ctx, "users", handler, "watcher")

Connection Loss

handler := func(event cocobase.Event) {
fmt.Printf("Event: %s\n", event.Event)
}

conn, err := client.WatchCollection(ctx, "users", handler, "watcher")
if err != nil {
log.Fatal(err)
}

// Monitor connection status
go func() {
for {
time.Sleep(5 * time.Second)

if conn.IsClosed() {
fmt.Println("Connection lost, attempting to reconnect...")

// Reconnect
newConn, err := client.WatchCollection(ctx, "users", handler, "watcher")
if err != nil {
fmt.Printf("Reconnection failed: %v\n", err)
continue
}

conn = newConn
fmt.Println("Reconnected successfully")
}
}
}()

Advanced Patterns

Debounced Updates

type Debouncer struct {
timer *time.Timer
duration time.Duration
mu sync.Mutex
}

func NewDebouncer(d time.Duration) *Debouncer {
return &Debouncer{duration: d}
}

func (d *Debouncer) Debounce(fn func()) {
d.mu.Lock()
defer d.mu.Unlock()

if d.timer != nil {
d.timer.Stop()
}

d.timer = time.AfterFunc(d.duration, fn)
}

// Usage
debouncer := NewDebouncer(500 * time.Millisecond)

handler := func(event cocobase.Event) {
debouncer.Debounce(func() {
// This will only run if no new events come for 500ms
updateUI()
})
}

conn, err := client.WatchCollection(ctx, "users", handler, "debounced")

Event Buffering

type EventBuffer struct {
events []cocobase.Event
mu sync.Mutex
}

func (b *EventBuffer) Add(event cocobase.Event) {
b.mu.Lock()
defer b.mu.Unlock()
b.events = append(b.events, event)
}

func (b *EventBuffer) Flush() []cocobase.Event {
b.mu.Lock()
defer b.mu.Unlock()
events := b.events
b.events = nil
return events
}

// Usage
buffer := &EventBuffer{}

handler := func(event cocobase.Event) {
buffer.Add(event)
}

// Flush buffer periodically
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
events := buffer.Flush()
if len(events) > 0 {
processBatch(events)
}
}
}()

conn, err := client.WatchCollection(ctx, "users", handler, "buffered")

State Synchronization

type Store struct {
data map[string]cocobase.Document
mu sync.RWMutex
}

func NewStore() *Store {
return &Store{
data: make(map[string]cocobase.Document),
}
}

func (s *Store) handler(event cocobase.Event) {
s.mu.Lock()
defer s.mu.Unlock()

switch event.Event {
case "create", "update":
s.data[event.Data.ID] = event.Data

case "delete":
delete(s.data, event.Data.ID)
}

fmt.Printf("Store now has %d documents\n", len(s.data))
}

func (s *Store) Get(id string) (cocobase.Document, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
doc, ok := s.data[id]
return doc, ok
}

// Usage
store := NewStore()

conn, err := client.WatchCollection(ctx, "users", store.handler, "store")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// Access synchronized data
doc, exists := store.Get("user-123")
if exists {
fmt.Printf("User: %+v\n", doc.Data)
}

Notification System

type Notifier struct {
subscribers []chan cocobase.Event
mu sync.RWMutex
}

func NewNotifier() *Notifier {
return &Notifier{
subscribers: make([]chan cocobase.Event, 0),
}
}

func (n *Notifier) Subscribe() chan cocobase.Event {
n.mu.Lock()
defer n.mu.Unlock()

ch := make(chan cocobase.Event, 10)
n.subscribers = append(n.subscribers, ch)
return ch
}

func (n *Notifier) handler(event cocobase.Event) {
n.mu.RLock()
defer n.mu.RUnlock()

for _, ch := range n.subscribers {
select {
case ch <- event:
default:
// Channel full, skip
}
}
}

// Usage
notifier := NewNotifier()

conn, err := client.WatchCollection(ctx, "users", notifier.handler, "notifier")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// Subscribe in different parts of your app
ch1 := notifier.Subscribe()
ch2 := notifier.Subscribe()

go func() {
for event := range ch1 {
fmt.Printf("Subscriber 1: %s\n", event.Event)
}
}()

go func() {
for event := range ch2 {
fmt.Printf("Subscriber 2: %s\n", event.Event)
}
}()

Best Practices

1. Always Close Connections

// Good: Use defer to ensure cleanup
conn, err := client.WatchCollection(ctx, "users", handler, "watcher")
if err != nil {
log.Fatal(err)
}
defer conn.Close()

2. Handle Errors in Callbacks

// Good: Protect against panics
handler := func(event cocobase.Event) {
defer func() {
if r := recover(); r != nil {
log.Printf("Error in handler: %v\n", r)
}
}()

processEvent(event)
}

3. Use Named Connections

// Good: Use descriptive names
conn, err := client.WatchCollection(ctx, "users", handler, "dashboard-users")

4. Implement Reconnection Logic

// Good: Handle connection loss
func watchWithReconnect(client *cocobase.Client, ctx context.Context,
collection string, handler func(cocobase.Event)) {

for {
conn, err := client.WatchCollection(ctx, collection, handler, "")
if err != nil {
log.Printf("Connection failed: %v\n", err)
time.Sleep(5 * time.Second)
continue
}

// Wait for connection to close
for !conn.IsClosed() {
time.Sleep(1 * time.Second)
}

log.Println("Connection lost, reconnecting...")
time.Sleep(2 * time.Second)
}
}

5. Filter Events When Possible

// Good: Filter early to reduce processing
handler := func(event cocobase.Event) {
// Only process updates
if event.Event != "update" {
return
}

processUpdate(event.Data)
}

6. Use Buffering for High-Frequency Updates

// Good: Buffer events for batch processing
buffer := make([]cocobase.Event, 0, 100)
var mu sync.Mutex

handler := func(event cocobase.Event) {
mu.Lock()
buffer = append(buffer, event)
mu.Unlock()
}

// Process buffer periodically
go func() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
mu.Lock()
if len(buffer) > 0 {
processBatch(buffer)
buffer = buffer[:0]
}
mu.Unlock()
}
}()

7. Keep Handlers Fast

// Good: Offload heavy processing
handler := func(event cocobase.Event) {
// Quick processing only
go heavyProcessing(event) // Offload to goroutine
}

// Bad: Heavy processing in handler
// handler := func(event cocobase.Event) {
// heavyProcessing(event) // Blocks next event
// }

Previous: Authentication | Next: Storage