#!/usr/bin/env bash

set -u -o pipefail

DEVICE_ID="${DEVICE_ID:-}"
DEVICE_SECRET="${DEVICE_SECRET:-}"
GATEWAY_URL="${GATEWAY_URL:-https://supportive-art-production-3e3a.up.railway.app}"
CRM_WEBHOOK_URL="${CRM_WEBHOOK_URL:-https://www.usenextora.com/api/messages/inbound/blue}"
CRM_OUTBOUND_WEBHOOK_URL="${CRM_OUTBOUND_WEBHOOK_URL:-https://www.usenextora.com/api/messages/outbound/blue-status}"
CHECK_INTERVAL="${CHECK_INTERVAL:-10}"
ENABLE_INBOUND="${ENABLE_INBOUND:-true}"

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
ENV_FILE="$SCRIPT_DIR/.env"
LOG_DIR="$SCRIPT_DIR/logs"
CHAT_DB_PATH="${HOME}/Library/Messages/chat.db"
OUTBOUND_STATUS_DIR="$SCRIPT_DIR/outbound-status-pending"
OUTBOUND_SYNC_CURSOR_FILE="$SCRIPT_DIR/outbound-sync-cursor"
WORKER_LOCK_DIR="$SCRIPT_DIR/worker.lock"

load_env_file() {
  if [ -f "$ENV_FILE" ]; then
    # shellcheck disable=SC1090
    . "$ENV_FILE"
  fi

  DEVICE_ID="${DEVICE_ID:-}"
  DEVICE_SECRET="${DEVICE_SECRET:-}"
  GATEWAY_URL="${GATEWAY_URL:-https://supportive-art-production-3e3a.up.railway.app}"
  CRM_WEBHOOK_URL="${CRM_WEBHOOK_URL:-https://www.usenextora.com/api/messages/inbound/blue}"
  CRM_OUTBOUND_WEBHOOK_URL="${CRM_OUTBOUND_WEBHOOK_URL:-https://www.usenextora.com/api/messages/outbound/blue-status}"
  CHECK_INTERVAL="${CHECK_INTERVAL:-10}"
  ENABLE_INBOUND="${ENABLE_INBOUND:-true}"
}

log() {
  local message="$1"
  local level="${2:-INFO}"
  mkdir -p "$LOG_DIR"
  printf '[%s] [%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$level" "$message"
}

release_worker_lock() {
  rm -rf "$WORKER_LOCK_DIR" 2>/dev/null || true
}

acquire_worker_lock() {
  local existing_pid=""

  if mkdir "$WORKER_LOCK_DIR" 2>/dev/null; then
    printf '%s' "$$" > "$WORKER_LOCK_DIR/pid"
    return 0
  fi

  if [ -f "$WORKER_LOCK_DIR/pid" ]; then
    existing_pid="$(tr -cd '0-9' < "$WORKER_LOCK_DIR/pid" 2>/dev/null)"
  fi

  if [ -n "$existing_pid" ] && kill -0 "$existing_pid" 2>/dev/null; then
    log "Worker already running with PID $existing_pid; exiting duplicate launch." "WARN"
    exit 0
  fi

  rm -rf "$WORKER_LOCK_DIR" 2>/dev/null || true
  if mkdir "$WORKER_LOCK_DIR" 2>/dev/null; then
    printf '%s' "$$" > "$WORKER_LOCK_DIR/pid"
    return 0
  fi

  log "Unable to acquire worker lock at $WORKER_LOCK_DIR" "ERROR"
  exit 1
}

normalize_phone() {
  local raw="${1:-}"
  raw="$(printf '%s' "$raw" | tr -d '\r' | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')"
  if [ -z "$raw" ]; then
    printf '%s' ""
    return
  fi

  local digits
  case "$raw" in
    +*)
      digits="$(printf '%s' "$raw" | tr -cd '0-9')"
      if [ -n "$digits" ]; then
        printf '+%s' "$digits"
        return
      fi
      ;;
  esac

  digits="$(printf '%s' "$raw" | tr -cd '0-9')"
  if [ "${#digits}" -eq 10 ]; then
    printf '+1%s' "$digits"
    return
  fi
  if [ "${#digits}" -eq 11 ] && [ "${digits#1}" != "$digits" ]; then
    printf '+%s' "$digits"
    return
  fi

  printf '%s' ""
}

normalize_email() {
  local raw="${1:-}"
  raw="$(printf '%s' "$raw" | tr '[:upper:]' '[:lower:]' | tr -d '\r' | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')"
  case "$raw" in
    *@*) printf '%s' "$raw" ;;
    *) printf '%s' "" ;;
  esac
}

normalize_message_text() {
  printf '%s' "${1:-}" | tr '\r\n' '  ' | sed 's/[[:space:]]\+/ /g; s/^[[:space:]]*//; s/[[:space:]]*$//'
}

build_handle_variants() {
  local raw="${1:-}"
  local digits e164
  digits="$(printf '%s' "$raw" | tr -cd '0-9')"
  e164="$(normalize_phone "$raw")"

  {
    if [ -n "$e164" ]; then
      printf '%s\n' "$e164"
      printf '%s\n' "${e164#+}"
      if [ "${e164#+1}" != "$e164" ] && [ "${#e164}" -eq 12 ]; then
        printf '%s\n' "${e164#'+1'}"
      fi
    fi

    if [ -n "$digits" ]; then
      printf '%s\n' "$digits"
      if [ "${#digits}" -eq 10 ]; then
        printf '1%s\n' "$digits"
        printf '+1%s\n' "$digits"
      fi
      if [ "${#digits}" -eq 11 ] && [ "${digits#1}" != "$digits" ]; then
        printf '%s\n' "${digits#1}"
        printf '+%s\n' "$digits"
      fi
    fi
  } | awk 'length($0) >= 10' | awk '!seen[$0]++'
}

json_path() {
  local path="$1"
  /usr/bin/ruby -rjson -e '
    data = JSON.parse(STDIN.read) rescue nil
    value = data
    ARGV[0].split(".").each do |segment|
      break if value.nil?
      if value.is_a?(Hash)
        value = value[segment]
      elsif value.is_a?(Array) && segment =~ /^\d+$/
        value = value[segment.to_i]
      else
        value = nil
      end
    end
    if value.is_a?(Array)
      puts value.map(&:to_s).join("\n")
    elsif value.nil?
      puts ""
    else
      puts value.to_s
    end
  ' "$path"
}

supports_sms_relay() {
  local raw="${1:-}"
  local digits
  if [ -z "$raw" ]; then
    return 1
  fi
  case "$raw" in
    *";+;chat"*|*"@"*) return 1 ;;
  esac
  digits="$(printf '%s' "$raw" | tr -cd '0-9')"
  [ "${#digits}" -ge 10 ]
}

send_heartbeat() {
  local tmp_file http_code snippet
  tmp_file="$(mktemp)"
  http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o "$tmp_file" -w '%{http_code}' \
    -X POST "${GATEWAY_URL%/}/api/device-workers/heartbeat" \
    -H "x-device-id: $DEVICE_ID" \
    -H "x-device-secret: $DEVICE_SECRET" 2>/dev/null || printf '000')"

  if [ "$http_code" = "200" ]; then
    rm -f "$tmp_file"
    return 0
  fi

  snippet="$(tr '\n' ' ' < "$tmp_file" | cut -c1-180)"
  rm -f "$tmp_file"
  log "Heartbeat failed: $http_code $snippet" "WARN"
  return 1
}

fetch_sync_context() {
  local tmp_file http_code
  tmp_file="$(mktemp)"
  http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o "$tmp_file" -w '%{http_code}' \
    -X GET "${GATEWAY_URL%/}/api/device-workers/sync-context" \
    -H "x-device-id: $DEVICE_ID" \
    -H "x-device-secret: $DEVICE_SECRET" 2>/dev/null || printf '000')"

  if [ "$http_code" != "200" ]; then
    local snippet
    snippet="$(tr '\n' ' ' < "$tmp_file" | cut -c1-180)"
    rm -f "$tmp_file"
    log "Sync context unavailable ($http_code): $snippet" "WARN"
    return 1
  fi

  SYNC_OWNER_USER_ID="$(cat "$tmp_file" | json_path "ownerUserId")"
  SYNC_LAST_INBOUND_ROWID="$(cat "$tmp_file" | json_path "lastInboundRowId")"
  [ -n "$SYNC_LAST_INBOUND_ROWID" ] || SYNC_LAST_INBOUND_ROWID=0
  rm -f "$tmp_file"
  return 0
}

update_inbound_cursor() {
  local last_rowid="$1"
  local http_code
  http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o /dev/null -w '%{http_code}' \
    -X POST "${GATEWAY_URL%/}/api/device-workers/inbound-cursor" \
    -H "Content-Type: application/json" \
    -H "x-device-id: $DEVICE_ID" \
    -H "x-device-secret: $DEVICE_SECRET" \
    --data "{\"lastInboundRowId\":${last_rowid}}" 2>/dev/null || printf '000')"

  case "$http_code" in
    200|201|204) return 0 ;;
  esac

  log "Inbound cursor update failed ($http_code)" "WARN"
  return 1
}

read_outbound_sync_cursor() {
  if [ -f "$OUTBOUND_SYNC_CURSOR_FILE" ]; then
    tr -d '\r' < "$OUTBOUND_SYNC_CURSOR_FILE" | tr -cd '0-9'
    return
  fi
  printf '0'
}

write_outbound_sync_cursor() {
  local rowid="$1"
  printf '%s' "${rowid:-0}" > "$OUTBOUND_SYNC_CURSOR_FILE"
}

initialize_outbound_sync_cursor() {
  if [ -f "$OUTBOUND_SYNC_CURSOR_FILE" ]; then
    LAST_OUTBOUND_SYNC_ROWID="$(read_outbound_sync_cursor)"
    [ -n "${LAST_OUTBOUND_SYNC_ROWID:-}" ] || LAST_OUTBOUND_SYNC_ROWID=0
    return 0
  fi

  local baseline
  baseline="$(get_last_outbound_rowid)"
  baseline="${baseline:-0}"
  if [ "$baseline" -gt 25 ] 2>/dev/null; then
    baseline=$((baseline - 25))
  fi
  write_outbound_sync_cursor "$baseline"
  LAST_OUTBOUND_SYNC_ROWID="$baseline"
}

register_allow_handle() {
  local recipient="$1"
  local e164 variants payload http_code
  e164="$(normalize_phone "$recipient")"
  [ -n "$e164" ] || return 1
  variants="$(build_handle_variants "$recipient")"
  payload="$(RECIPIENT_E164="$e164" VARIANTS="$variants" /usr/bin/ruby -rjson -e '
    e164 = ENV["RECIPIENT_E164"].to_s
    variants = ENV["VARIANTS"].to_s.split("\n").map(&:strip).reject(&:empty?)
    puts JSON.generate({ e164: e164, variants: variants })
  ')"
  http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o /dev/null -w '%{http_code}' \
    -X POST "${GATEWAY_URL%/}/api/device-workers/allow-handle" \
    -H "Content-Type: application/json" \
    -H "x-device-id: $DEVICE_ID" \
    -H "x-device-secret: $DEVICE_SECRET" \
    --data "$payload" 2>/dev/null || printf '000')"

  case "$http_code" in
    200|201|204) return 0 ;;
  esac

  log "Allow-handle update failed ($http_code)" "WARN"
  return 1
}

send_via_messages_service() {
  local recipient="$1"
  local body="$2"
  local transport_label="$3"
  local service_literal="$4"
  if /usr/bin/osascript - "$recipient" "$body" <<APPLESCRIPT >/dev/null 2>&1
on run argv
    set targetRecipient to item 1 of argv
    set messageBody to item 2 of argv
    tell application "Messages"
        set targetService to missing value
        try
            set targetService to 1st account whose service type = ${service_literal}
        on error
            set targetService to 1st service whose service type = ${service_literal}
        end try
        set targetBuddy to participant targetRecipient of targetService
        send messageBody to targetBuddy
    end tell
end run
APPLESCRIPT
  then
    log "Sent via $transport_label" "INFO"
    return 0
  fi

  log "$transport_label send failed" "WARN"
  return 1
}

send_imessage() {
  local recipient="$1"
  local message_body="$2"
  if send_via_messages_service "$recipient" "$message_body" "iMessage" "iMessage"; then
    return 0
  fi

  if supports_sms_relay "$recipient"; then
    log "iMessage unavailable for $recipient; trying SMS relay" "INFO"
    send_via_messages_service "$recipient" "$message_body" "SMS relay" "SMS"
    return $?
  fi

  return 1
}

handle_matches_recipient() {
  local recipient="$1"
  local handle="$2"
  local normalized_handle email variant
  normalized_handle="$(normalize_phone "$handle")"
  email="$(normalize_email "$handle")"

  if [ -n "$email" ]; then
    [ "$email" = "$(normalize_email "$recipient")" ]
    return
  fi

  while IFS= read -r variant; do
    [ -n "$variant" ] || continue
    if [ "$variant" = "$handle" ] || [ "$variant" = "$normalized_handle" ]; then
      return 0
    fi
  done <<EOF
$(build_handle_variants "$recipient")
EOF

  return 1
}

get_last_outbound_rowid() {
  sqlite3 -readonly "$CHAT_DB_PATH" "SELECT COALESCE(MAX(ROWID), 0) FROM message WHERE is_from_me = 1;" 2>/dev/null | tr -d '\r'
}

confirm_recent_outbound_message() {
  local baseline_rowid="$1"
  local recipient="$2"
  local body="$3"
  local expected_text attempt rowid handle text _date rows

  expected_text="$(normalize_message_text "$body")"

  if ! sqlite3 -readonly "$CHAT_DB_PATH" "SELECT 1;" >/dev/null 2>&1; then
    log "Outbound confirmation failed: cannot read Messages DB" "ERROR"
    return 1
  fi

  for attempt in 1 2 3 4 5 6 7 8; do
    rows="$(
      sqlite3 -readonly -separator $'\x1f' "$CHAT_DB_PATH" "
        SELECT
          m.ROWID,
          COALESCE(
            NULLIF(h.id, ''),
            NULLIF((
              SELECT h2.id
              FROM chat_message_join cmj2
              JOIN chat_handle_join chj2 ON chj2.chat_id = cmj2.chat_id
              JOIN handle h2 ON h2.ROWID = chj2.handle_id
              WHERE cmj2.message_id = m.ROWID
              ORDER BY h2.ROWID ASC
              LIMIT 1
            ), ''),
            ''
          ),
          COALESCE(m.text, ''),
          m.date
        FROM message m
        LEFT JOIN handle h ON m.handle_id = h.ROWID
        WHERE m.is_from_me = 1
          AND m.ROWID > ${baseline_rowid}
        ORDER BY m.ROWID DESC
        LIMIT 20;
      " 2>/dev/null
    )"
    while IFS=$'\x1f' read -r rowid handle text _date; do
      [ -n "${rowid:-}" ] || continue
      if handle_matches_recipient "$recipient" "$handle" && [ "$(normalize_message_text "$text")" = "$expected_text" ]; then
        log "Confirmed outbound message in Messages DB (row ${rowid})" "SUCCESS"
        return 0
      fi
    done <<EOF
$rows
EOF
    sleep 1
  done

  log "Unable to confirm outbound message in Messages DB" "WARN"
  return 1
}

build_outbound_status_payload() {
  local message_id="$1"
  local status="$2"
  local error_message="${3:-}"
  MESSAGE_ID="$message_id" DELIVERY_STATUS="$status" DELIVERY_ERROR="$error_message" /usr/bin/ruby -rjson -e '
    payload = {
      providerId: ENV["MESSAGE_ID"].to_s,
      messageId: ENV["MESSAGE_ID"].to_s,
      status: ENV["DELIVERY_STATUS"].to_s
    }
    error = ENV["DELIVERY_ERROR"].to_s.strip
    payload[:error] = error unless error.empty?
    puts JSON.generate(payload)
  '
}

post_outbound_status_payload() {
  local payload="$1"
  local webhook_secret http_code
  webhook_secret="${MESSAGING_GATEWAY_WEBHOOK_SECRET:-${CRM_WEBHOOK_SECRET:-}}"

  if [ -n "$webhook_secret" ]; then
    http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o /dev/null -w '%{http_code}' \
      -X POST "$CRM_OUTBOUND_WEBHOOK_URL" \
      -H "Content-Type: application/json" \
      -H "x-device-id: $DEVICE_ID" \
      -H "x-device-secret: $DEVICE_SECRET" \
      -H "x-webhook-secret: $webhook_secret" \
      --data "$payload" 2>/dev/null || printf '000')"
  else
    http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o /dev/null -w '%{http_code}' \
      -X POST "$CRM_OUTBOUND_WEBHOOK_URL" \
      -H "Content-Type: application/json" \
      -H "x-device-id: $DEVICE_ID" \
      -H "x-device-secret: $DEVICE_SECRET" \
      --data "$payload" 2>/dev/null || printf '000')"
  fi

  case "$http_code" in
    200|201|204) return 0 ;;
  esac

  return 1
}

queue_outbound_status_retry() {
  local message_id="$1"
  local status="$2"
  local error_message="${3:-}"
  mkdir -p "$OUTBOUND_STATUS_DIR"
  build_outbound_status_payload "$message_id" "$status" "$error_message" > "$OUTBOUND_STATUS_DIR/${message_id}.json"
}

notify_outbound_to_crm() {
  local message_id="$1"
  local status="$2"
  local error_message="${3:-}"
  local payload attempt
  payload="$(build_outbound_status_payload "$message_id" "$status" "$error_message")"

  for attempt in 1 2 3 4 5; do
    if post_outbound_status_payload "$payload"; then
      return 0
    fi
    sleep 2
  done

  queue_outbound_status_retry "$message_id" "$status" "$error_message"
  log "Outbound status callback deferred for retry" "WARN"
  return 1
}

flush_outbound_status_queue() {
  local file payload
  mkdir -p "$OUTBOUND_STATUS_DIR"
  for file in "$OUTBOUND_STATUS_DIR"/*.json; do
    [ -e "$file" ] || return 0
    payload="$(cat "$file" 2>/dev/null || true)"
    [ -n "$payload" ] || { rm -f "$file"; continue; }
    if post_outbound_status_payload "$payload"; then
      rm -f "$file"
      log "Flushed deferred outbound status $(basename "$file" .json)" "INFO"
    fi
  done
}

mark_job() {
  local message_id="$1"
  local endpoint="$2"
  local payload="$3"
  curl -sS --connect-timeout 5 --max-time 10 -o /dev/null \
    -X POST "${GATEWAY_URL%/}/api/device-workers/jobs/${message_id}/${endpoint}" \
    -H "Content-Type: application/json" \
    -H "x-device-id: $DEVICE_ID" \
    -H "x-device-secret: $DEVICE_SECRET" \
    --data "$payload" 2>/dev/null || log "Failed to mark job ${endpoint}" "ERROR"
}

claim_and_process_job() {
  local tmp_file http_code recipient body message_id claim_token payload baseline_rowid
  tmp_file="$(mktemp)"
  http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o "$tmp_file" -w '%{http_code}' \
    -X POST "${GATEWAY_URL%/}/api/device-workers/jobs/claim" \
    -H "x-device-id: $DEVICE_ID" \
    -H "x-device-secret: $DEVICE_SECRET" 2>/dev/null || printf '000')"

  if [ "$http_code" != "200" ]; then
    rm -f "$tmp_file"
    [ "$http_code" = "204" ] || [ "$http_code" = "000" ] || log "Claim failed: $http_code" "WARN"
    return 1
  fi

  recipient="$(cat "$tmp_file" | json_path "job.message.to")"
  body="$(cat "$tmp_file" | json_path "job.message.body")"
  message_id="$(cat "$tmp_file" | json_path "job.message.id")"
  claim_token="$(cat "$tmp_file" | json_path "job.claimToken")"
  rm -f "$tmp_file"

  if [ -z "$recipient" ] || [ -z "$body" ] || [ -z "$message_id" ] || [ -z "$claim_token" ]; then
    return 1
  fi

  baseline_rowid="$(get_last_outbound_rowid)"
  baseline_rowid="${baseline_rowid:-0}"
  log "Sending to ${recipient}: $(printf '%s' "$body" | tr '\n' ' ' | cut -c1-80)" "INFO"
  if send_imessage "$recipient" "$body" && confirm_recent_outbound_message "$baseline_rowid" "$recipient" "$body"; then
    register_allow_handle "$recipient" >/dev/null 2>&1 || true
    notify_outbound_to_crm "$message_id" "sent" "" >/dev/null 2>&1 || true
    payload="$(CLAIM_TOKEN="$claim_token" /usr/bin/ruby -rjson -e 'puts JSON.generate({ claimToken: ENV["CLAIM_TOKEN"].to_s })')"
    mark_job "$message_id" "complete" "$payload"
    log "Message sent successfully" "SUCCESS"
    return 0
  fi

  notify_outbound_to_crm "$message_id" "failed" "Failed to confirm delivery in Messages app" >/dev/null 2>&1 || true
  payload="$(CLAIM_TOKEN="$claim_token" /usr/bin/ruby -rjson -e 'puts JSON.generate({ claimToken: ENV["CLAIM_TOKEN"].to_s, error: "Failed to send via Messages app" })')"
  mark_job "$message_id" "fail" "$payload"
  log "Failed to send message" "ERROR"
  return 1
}

check_database_access() {
  if [ ! -f "$CHAT_DB_PATH" ]; then
    log "Messages database not found: $CHAT_DB_PATH" "ERROR"
    return 1
  fi

  if sqlite3 -readonly "$CHAT_DB_PATH" "SELECT COUNT(*) FROM message;" >/dev/null 2>&1; then
    local count
    count="$(sqlite3 -readonly "$CHAT_DB_PATH" "SELECT COUNT(*) FROM message;" 2>/dev/null | tr -d '\r')"
    log "Inbound DB access OK (${count} total messages)" "SUCCESS"
    return 0
  fi

  log "Inbound disabled: cannot read Messages DB" "WARN"
  log "Grant Full Disk Access to Terminal and restart the worker." "WARN"
  return 1
}

get_last_inbound_rowid() {
  sqlite3 -readonly "$CHAT_DB_PATH" "SELECT COALESCE(MAX(ROWID), 0) FROM message WHERE is_from_me = 0;" 2>/dev/null | tr -d '\r'
}

fetch_new_inbound_messages() {
  local last_rowid="$1"
  sqlite3 -readonly -separator $'\x1f' "$CHAT_DB_PATH" "
    SELECT
      m.ROWID,
      COALESCE(
        NULLIF(h.id, ''),
        NULLIF((
          SELECT h2.id
          FROM chat_message_join cmj2
          JOIN chat_handle_join chj2 ON chj2.chat_id = cmj2.chat_id
          JOIN handle h2 ON h2.ROWID = chj2.handle_id
          WHERE cmj2.message_id = m.ROWID
          ORDER BY h2.ROWID ASC
          LIMIT 1
        ), ''),
        ''
      ),
      replace(replace(m.text, char(10), ' '), char(13), ' '),
      COALESCE(m.guid, ''),
      m.date
    FROM message m
    LEFT JOIN handle h ON m.handle_id = h.ROWID
    WHERE m.is_from_me = 0
      AND m.ROWID > ${last_rowid}
      AND m.text IS NOT NULL
      AND LENGTH(TRIM(m.text)) > 0
    ORDER BY m.ROWID ASC
    LIMIT 200;
  " 2>/dev/null
}

fetch_new_outbound_messages() {
  local last_rowid="$1"
  sqlite3 -readonly -separator $'\x1f' "$CHAT_DB_PATH" "
    SELECT
      m.ROWID,
      COALESCE(
        NULLIF(h.id, ''),
        NULLIF((
          SELECT h2.id
          FROM chat_message_join cmj2
          JOIN chat_handle_join chj2 ON chj2.chat_id = cmj2.chat_id
          JOIN handle h2 ON h2.ROWID = chj2.handle_id
          WHERE cmj2.message_id = m.ROWID
          ORDER BY h2.ROWID ASC
          LIMIT 1
        ), ''),
        ''
      ),
      replace(replace(m.text, char(10), ' '), char(13), ' '),
      COALESCE(m.guid, ''),
      m.date
    FROM message m
    LEFT JOIN handle h ON m.handle_id = h.ROWID
    WHERE m.is_from_me = 1
      AND m.ROWID > ${last_rowid}
      AND m.text IS NOT NULL
      AND LENGTH(TRIM(m.text)) > 0
    ORDER BY m.ROWID ASC
    LIMIT 200;
  " 2>/dev/null
}

forward_inbound_to_crm() {
  local owner_user_id="$1"
  local rowid="$2"
  local sender="$3"
  local text="$4"
  local guid="$5"
  local date_raw="$6"
  local from_phone from_email sender_value sender_type webhook_secret payload http_code

  from_phone="$(normalize_phone "$sender")"
  from_email="$(normalize_email "$sender")"

  if [ -n "$from_phone" ]; then
    sender_value="$from_phone"
    sender_type="phone"
  elif [ -n "$from_email" ]; then
    sender_value="$from_email"
    sender_type="email"
  else
    log "Skipping inbound row $rowid: unsupported sender ($sender)" "WARN"
    return 0
  fi

  webhook_secret="${MESSAGING_GATEWAY_WEBHOOK_SECRET:-${CRM_WEBHOOK_SECRET:-}}"
  payload="$(OWNER_USER_ID="$owner_user_id" SENDER_VALUE="$sender_value" TEXT_VALUE="$text" MESSAGE_ID="${guid:-imsg_${rowid}}" ROW_ID="$rowid" DATE_RAW="$date_raw" RAW_SENDER="$sender" SENDER_TYPE="$sender_type" /usr/bin/ruby -rjson -e '
    require "time"
    payload = {
      from: ENV["SENDER_VALUE"].to_s,
      text: ENV["TEXT_VALUE"].to_s.strip,
      messageId: ENV["MESSAGE_ID"].to_s,
      metadata: {
        source: "mac-worker",
        rowId: ENV["ROW_ID"].to_i,
        timestamp: Time.at(((ENV["DATE_RAW"].to_f > 1_000_000_000_000) ? (ENV["DATE_RAW"].to_f / 1_000_000_000) : ENV["DATE_RAW"].to_f) + 978307200).utc.iso8601,
        rawSender: ENV["RAW_SENDER"].to_s,
        senderType: ENV["SENDER_TYPE"].to_s
      }
    }
    owner = ENV["OWNER_USER_ID"].to_s.strip
    payload[:ownerUserId] = owner unless owner.empty?
    puts JSON.generate(payload)
  ')"

  if [ -n "$webhook_secret" ]; then
    http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o /dev/null -w '%{http_code}' \
      -X POST "$CRM_WEBHOOK_URL" \
      -H "Content-Type: application/json" \
      -H "x-device-id: $DEVICE_ID" \
      -H "x-device-secret: $DEVICE_SECRET" \
      -H "x-webhook-secret: $webhook_secret" \
      --data "$payload" 2>/dev/null || printf '000')"
  else
    http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o /dev/null -w '%{http_code}' \
      -X POST "$CRM_WEBHOOK_URL" \
      -H "Content-Type: application/json" \
      -H "x-device-id: $DEVICE_ID" \
      -H "x-device-secret: $DEVICE_SECRET" \
      --data "$payload" 2>/dev/null || printf '000')"
  fi

  case "$http_code" in
    200|201) return 0 ;;
  esac

  log "Inbound forward failed ($http_code)" "ERROR"
  return 1
}

process_inbound() {
  local current_rowid="$1"
  local owner_user_id="$2"
  local rows rowid sender text guid date_raw cursor
  rows="$(fetch_new_inbound_messages "$current_rowid")"
  [ -n "$rows" ] || return 0

  cursor="$current_rowid"
  log "Found new inbound message(s)" "INFO"
  while IFS=$'\x1f' read -r rowid sender text guid date_raw; do
    [ -n "${rowid:-}" ] || continue
    if ! forward_inbound_to_crm "$owner_user_id" "$rowid" "$sender" "$text" "$guid" "$date_raw"; then
      return 1
    fi
    cursor="$rowid"
    if ! update_inbound_cursor "$cursor"; then
      return 1
    fi
    log "Inbound synced from $sender: $(printf '%s' "$text" | cut -c1-60)" "SUCCESS"
  done <<EOF
$rows
EOF

  LAST_INBOUND_ROWID="$cursor"
  return 0
}

forward_outbound_sync_to_crm() {
  local owner_user_id="$1"
  local rowid="$2"
  local recipient="$3"
  local text="$4"
  local guid="$5"
  local date_raw="$6"
  local to_phone to_email recipient_value webhook_secret payload http_code response_file response_body

  to_phone="$(normalize_phone "$recipient")"
  to_email="$(normalize_email "$recipient")"

  if [ -n "$to_phone" ]; then
    recipient_value="$to_phone"
  elif [ -n "$to_email" ]; then
    recipient_value="$to_email"
  else
    log "Skipping outbound row $rowid: unsupported recipient ($recipient)" "WARN"
    return 0
  fi

  webhook_secret="${MESSAGING_GATEWAY_WEBHOOK_SECRET:-${CRM_WEBHOOK_SECRET:-}}"
  payload="$(OWNER_USER_ID="$owner_user_id" RECIPIENT_VALUE="$recipient_value" TEXT_VALUE="$text" MESSAGE_ID="${guid:-imsg_out_${rowid}}" ROW_ID="$rowid" DATE_RAW="$date_raw" RAW_RECIPIENT="$recipient" /usr/bin/ruby -rjson -e '
    require "time"
    payload = {
      to: ENV["RECIPIENT_VALUE"].to_s,
      text: ENV["TEXT_VALUE"].to_s.strip,
      messageId: ENV["MESSAGE_ID"].to_s,
      isOutbound: true,
      metadata: {
        source: "mac-worker",
        rowId: ENV["ROW_ID"].to_i,
        timestamp: Time.at(((ENV["DATE_RAW"].to_f > 1_000_000_000_000) ? (ENV["DATE_RAW"].to_f / 1_000_000_000) : ENV["DATE_RAW"].to_f) + 978307200).utc.iso8601,
        rawRecipient: ENV["RAW_RECIPIENT"].to_s
      }
    }
    owner = ENV["OWNER_USER_ID"].to_s.strip
    payload[:ownerUserId] = owner unless owner.empty?
    puts JSON.generate(payload)
  ')"

  response_file="$(mktemp)"
  OUTBOUND_SYNC_LAST_ACTION="failed"
  OUTBOUND_SYNC_LAST_REASON=""

  if [ -n "$webhook_secret" ]; then
    http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o "$response_file" -w '%{http_code}' \
      -X POST "$CRM_WEBHOOK_URL" \
      -H "Content-Type: application/json" \
      -H "x-device-id: $DEVICE_ID" \
      -H "x-device-secret: $DEVICE_SECRET" \
      -H "x-webhook-secret: $webhook_secret" \
      --data "$payload" 2>/dev/null || printf '000')"
  else
    http_code="$(curl -sS --connect-timeout 5 --max-time 10 -o "$response_file" -w '%{http_code}' \
      -X POST "$CRM_WEBHOOK_URL" \
      -H "Content-Type: application/json" \
      -H "x-device-id: $DEVICE_ID" \
      -H "x-device-secret: $DEVICE_SECRET" \
      --data "$payload" 2>/dev/null || printf '000')"
  fi

  response_body="$(tr '\n' ' ' < "$response_file" 2>/dev/null | tr -s ' ' | cut -c1-220)"
  rm -f "$response_file"

  case "$http_code" in
    200|201)
      OUTBOUND_SYNC_LAST_ACTION="synced"
      return 0
      ;;
    202)
      OUTBOUND_SYNC_LAST_ACTION="skipped"
      OUTBOUND_SYNC_LAST_REASON="${response_body:-client_not_resolved}"
      return 0
      ;;
  esac

  log "Outbound sync forward failed ($http_code)" "ERROR"
  return 1
}

process_outbound_sync() {
  local current_rowid="$1"
  local owner_user_id="$2"
  local rows rowid recipient text guid date_raw cursor
  rows="$(fetch_new_outbound_messages "$current_rowid")"
  [ -n "$rows" ] || return 0

  cursor="$current_rowid"
  log "Found new outbound message(s) to mirror" "INFO"
  while IFS=$'\x1f' read -r rowid recipient text guid date_raw; do
    [ -n "${rowid:-}" ] || continue
    if ! forward_outbound_sync_to_crm "$owner_user_id" "$rowid" "$recipient" "$text" "$guid" "$date_raw"; then
      return 1
    fi
    cursor="$rowid"
    write_outbound_sync_cursor "$cursor"
    if [ "${OUTBOUND_SYNC_LAST_ACTION:-synced}" = "skipped" ]; then
      log "Outbound mirror skipped for $recipient: ${OUTBOUND_SYNC_LAST_REASON:-client_not_resolved}" "WARN"
    else
      log "Outbound synced to $recipient: $(printf '%s' "$text" | cut -c1-60)" "SUCCESS"
    fi
  done <<EOF
$rows
EOF

  LAST_OUTBOUND_SYNC_ROWID="$cursor"
  return 0
}

run_loop() {
  local inbound_enabled="no"

  if [ -z "$DEVICE_ID" ] || [ -z "$DEVICE_SECRET" ]; then
    log "Missing DEVICE_ID or DEVICE_SECRET in $ENV_FILE" "ERROR"
    exit 1
  fi

  if [ "$ENABLE_INBOUND" = "true" ] || [ "$ENABLE_INBOUND" = "1" ] || [ "$ENABLE_INBOUND" = "yes" ] || [ "$ENABLE_INBOUND" = "on" ]; then
    if check_database_access; then
      inbound_enabled="yes"
    fi
  fi

  if fetch_sync_context; then
    LAST_INBOUND_ROWID="${SYNC_LAST_INBOUND_ROWID:-0}"
  else
    LAST_INBOUND_ROWID=0
  fi
  initialize_outbound_sync_cursor || LAST_OUTBOUND_SYNC_ROWID=0

  if [ "$inbound_enabled" = "yes" ] && [ "${LAST_INBOUND_ROWID:-0}" -le 0 ]; then
    local baseline
    baseline="$(get_last_inbound_rowid)"
    baseline="${baseline:-0}"
    if update_inbound_cursor "$baseline"; then
      LAST_INBOUND_ROWID="$baseline"
    fi
  fi

  log "iMessage Worker started (shell runtime)"
  log "Device ID: $DEVICE_ID"
  log "Gateway: ${GATEWAY_URL%/}"
  log "CRM webhook: $CRM_WEBHOOK_URL"
  log "CRM outbound webhook: $CRM_OUTBOUND_WEBHOOK_URL"
  log "Checking every ${CHECK_INTERVAL} seconds"
  log "Inbound enabled: $inbound_enabled"
  log "Outbound mirror cursor: ${LAST_OUTBOUND_SYNC_ROWID:-0}"

  while true; do
    send_heartbeat || true
    flush_outbound_status_queue || true
    claim_and_process_job || true

    if [ "$inbound_enabled" = "yes" ] && fetch_sync_context; then
      local current_rowid owner_user_id baseline
      owner_user_id="${SYNC_OWNER_USER_ID:-}"
      current_rowid="${SYNC_LAST_INBOUND_ROWID:-0}"
      if [ "${current_rowid:-0}" -le 0 ]; then
        baseline="$(get_last_inbound_rowid)"
        baseline="${baseline:-0}"
        if update_inbound_cursor "$baseline"; then
          current_rowid="$baseline"
        fi
      fi
      process_inbound "$current_rowid" "$owner_user_id" || true
      process_outbound_sync "${LAST_OUTBOUND_SYNC_ROWID:-0}" "$owner_user_id" || true
    fi

    sleep "$CHECK_INTERVAL"
  done
}

trap 'release_worker_lock; log "Worker stopped by user"; exit 0' INT TERM
trap 'release_worker_lock' EXIT

mkdir -p "$LOG_DIR" "$OUTBOUND_STATUS_DIR"
load_env_file
acquire_worker_lock
run_loop
