PHP WebSocket examples for real-time financial news streaming
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Ratchet\Client\WebSocket;
use Ratchet\Client\Connector;
use Ratchet\RFC6455\Messaging\MessageInterface;
$connector = new Connector();
$apiKey = getenv('BYUL_API_KEY') ?: 'byul_api_key';
$connector('wss://api.byul.ai/news-v2', ['Sec-WebSocket-Protocol' => 'echo-protocol'])
->then(function (WebSocket $conn) use ($apiKey) {
echo "Connected to Byul AI\n";
// Send authentication
$auth_message = json_encode([
'type' => 'auth',
'apiKey' => $apiKey
]);
$conn->send($auth_message);
// Subscribe to high-importance news (date range)
$subscribe_message = json_encode([
'type' => 'news:subscribe',
'minImportance' => 8,
'limit' => 20,
'startDate' => '2024-01-01T00:00:00.000Z',
'endDate' => '2024-01-31T23:59:59.999Z'
]);
$conn->send($subscribe_message);
$conn->on('message', function (MessageInterface $msg) {
$data = json_decode($msg->getPayload(), true);
switch ($data['type'] ?? '') {
case 'auth:success':
echo "Authentication successful\n";
echo "Plan: " . $data['user']['plan'] . "\n";
break;
case 'news:subscribed':
echo "Subscribed to news feed\n";
break;
case 'news:data':
handleNewsData($data);
break;
case 'news:error':
echo "Error: " . $data['message'] . "\n";
break;
}
});
})
->otherwise(function (\Exception $e) {
echo "Could not connect: {$e->getMessage()}\n";
});
function handleNewsData($data) {
$articles = $data['data']['news'] ?? [];
echo "Received " . count($articles) . " breaking news articles\n";
foreach ($articles as $article) {
if ($article['importanceScore'] >= 9) {
echo "CRITICAL: {$article['title']} (Score: {$article['importanceScore']}/10)\n";
// Trigger high-priority alerts
sendCriticalAlert($article);
} else {
echo "📢 {$article['title']} (Score: {$article['importanceScore']}/10)\n";
}
}
}
function sendCriticalAlert($article) {
// Implement your alert logic here
// Send email, SMS, push notification, etc.
echo " → Alert sent for: {$article['title']}\n";
}
<?php
// app/Services/ByulWebSocketService.php
namespace App\Services;
use Illuminate\Support\Facades\Log;
use Ratchet\Client\Connector;
use Ratchet\Client\WebSocket;
class ByulWebSocketService
{
private $connector;
private $connection;
private $apiKey;
public function __construct()
{
$this->connector = new Connector();
$this->apiKey = config('services.byul.api_key');
}
public function connect(): \Promise\Promise
{
return $this->connector('wss://api.byul.ai/news-v2')
->then(function (WebSocket $conn) {
$this->connection = $conn;
// Authenticate
$conn->send(json_encode([
'type' => 'auth',
'apiKey' => $this->apiKey
]));
// Handle incoming messages
$conn->on('message', [$this, 'handleMessage']);
Log::info('Connected to Byul WebSocket');
return $conn;
})
->otherwise(function (\Exception $e) {
Log::error('WebSocket connection failed: ' . $e->getMessage());
throw $e;
});
}
public function subscribeToNews(array $filters = [])
{
$message = array_merge([
'type' => 'news:subscribe',
'minImportance' => 7
], $filters);
$this->connection->send(json_encode($message));
}
public function handleMessage($msg)
{
$data = json_decode($msg->getPayload(), true);
switch ($data['type'] ?? '') {
case 'news:data':
$this->processNewsData($data);
break;
case 'auth:success':
Log::info('WebSocket authenticated successfully');
break;
}
}
private function processNewsData($data)
{
$articles = $data['data']['news'] ?? [];
foreach ($articles as $article) {
// Store in database
\App\Models\NewsArticle::updateOrCreate(
['external_id' => $article['_id']],
[
'title' => $article['title'],
'importance_score' => $article['importanceScore'],
'category' => $article['category'],
'symbols' => json_encode($article['symbols'] ?? []),
'published_at' => $article['date'],
'url' => $article['url']
]
);
// Broadcast to users
if ($article['importanceScore'] >= 8) {
broadcast(new \App\Events\BreakingNewsEvent($article));
}
}
}
}
<?php
// Portfolio-specific news monitoring
class PortfolioNewsMonitor
{
private $portfolio;
private $webSocketService;
public function __construct(array $portfolio)
{
$this->portfolio = $portfolio; // ['AAPL', 'GOOGL', 'MSFT']
$this->webSocketService = new ByulWebSocketService();
}
public function startMonitoring()
{
$this->webSocketService->connect()
->then(function ($conn) {
// Subscribe to portfolio-specific news
foreach ($this->portfolio as $symbol) {
$this->webSocketService->subscribeToNews([
'symbol' => $symbol,
'minImportance' => 6
]);
}
echo "✓ Monitoring " . count($this->portfolio) . " symbols\n";
});
}
public function processNewsForPortfolio($article)
{
$symbols = $article['symbols'] ?? [];
$portfolioMatch = array_intersect($symbols, $this->portfolio);
if (!empty($portfolioMatch)) {
$this->sendPortfolioAlert($article, $portfolioMatch);
}
}
private function sendPortfolioAlert($article, $matchedSymbols)
{
$message = "Portfolio Alert!\n";
$message .= "Symbols: " . implode(', ', $matchedSymbols) . "\n";
$message .= "News: {$article['title']}\n";
$message .= "Importance: {$article['importanceScore']}/10\n";
// Send notification (email, SMS, Slack, etc.)
$this->notificationService->send($message);
}
}
// Usage
$portfolio = ['AAPL', 'GOOGL', 'MSFT', 'AMZN'];
$monitor = new PortfolioNewsMonitor($portfolio);
$monitor->startMonitoring();
<?php
// Generate trading signals from news
class NewsTradingSignals
{
private $signalRules;
public function __construct()
{
$this->signalRules = [
'fed' => ['importance' => 8, 'action' => 'HEDGE'],
'earnings' => ['importance' => 7, 'action' => 'POSITION'],
'merger' => ['importance' => 8, 'action' => 'BUY']
];
}
public function analyzeNews($article)
{
$category = $article['category'];
$importance = $article['importanceScore'];
$symbols = $article['symbols'] ?? [];
foreach ($this->signalRules as $keyword => $rule) {
if (strpos(strtolower($article['title']), $keyword) !== false) {
if ($importance >= $rule['importance']) {
return $this->generateSignal($article, $rule['action']);
}
}
}
return null;
}
private function generateSignal($article, $action)
{
return [
'action' => $action,
'confidence' => $article['importanceScore'] / 10,
'symbols' => $article['symbols'],
'reason' => $article['title'],
'timestamp' => now(),
'url' => $article['url']
];
}
}
<?php
// Laravel controller for real-time dashboard
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use App\Services\ByulWebSocketService;
class NewsController extends Controller
{
private $webSocketService;
public function __construct(ByulWebSocketService $webSocketService)
{
$this->webSocketService = $webSocketService;
}
public function streamNews(Request $request)
{
$filters = $request->only(['minImportance', 'symbols', 'category']);
return response()->stream(function () use ($filters) {
$this->webSocketService->connect()
->then(function ($conn) use ($filters) {
$this->webSocketService->subscribeToNews($filters);
// Stream data to browser via Server-Sent Events
$conn->on('message', function ($msg) {
$data = json_decode($msg->getPayload(), true);
if ($data['type'] === 'news:data') {
echo "data: " . json_encode($data) . "\n\n";
flush();
}
});
});
}, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Connection' => 'keep-alive'
]);
}
public function getMarketSummary()
{
$summary = [
'breaking_news_count' => $this->countBreakingNews(),
'top_categories' => $this->getTopCategories(),
'market_sentiment' => $this->calculateMarketSentiment()
];
return response()->json($summary);
}
}
<?php
class RobustWebSocketClient
{
private $connector;
private $apiKey;
private $maxRetries = 5;
private $retryDelay = 1;
private $currentRetries = 0;
public function __construct()
{
$this->connector = new Connector();
$this->apiKey = getenv('BYUL_API_KEY');
}
public function connectWithRetry()
{
$this->connector('wss://api.byul.ai/news-v2')
->then(
[$this, 'onConnect'],
[$this, 'onConnectError']
);
}
public function onConnect(WebSocket $conn)
{
echo "✓ WebSocket connected\n";
$this->currentRetries = 0;
$this->retryDelay = 1;
$conn->on('close', [$this, 'onClose']);
$conn->on('message', [$this, 'onMessage']);
// Send authentication
$conn->send(json_encode([
'type' => 'auth',
'apiKey' => $this->apiKey
]));
}
public function onConnectError($error)
{
echo "✗ Connection failed: " . $error->getMessage() . "\n";
if ($this->currentRetries < $this->maxRetries) {
$this->currentRetries++;
echo "Retrying in {$this->retryDelay}s... (Attempt {$this->currentRetries}/{$this->maxRetries})\n";
sleep($this->retryDelay);
$this->retryDelay *= 2; // Exponential backoff
$this->connectWithRetry();
} else {
echo "Max retries exceeded. Giving up.\n";
}
}
public function onClose($code = null, $reason = null)
{
echo "Connection closed. Code: $code, Reason: $reason\n";
echo "Attempting to reconnect...\n";
sleep(2);
$this->connectWithRetry();
}
public function onMessage($msg)
{
$data = json_decode($msg->getPayload(), true);
// Process different message types
$this->processMessage($data);
}
}
<?php
// Queue messages for processing
use Predis\Client;
class NewsMessageQueue
{
private $redis;
private $queueName = 'byul_news_queue';
public function __construct()
{
$this->redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
}
public function enqueue($newsData)
{
$message = [
'timestamp' => microtime(true),
'data' => $newsData,
'priority' => $this->calculatePriority($newsData)
];
$this->redis->lpush($this->queueName, json_encode($message));
}
public function processQueue()
{
while (true) {
$message = $this->redis->brpop($this->queueName, 1);
if ($message) {
$data = json_decode($message[1], true);
$this->processNewsMessage($data);
}
}
}
private function calculatePriority($newsData)
{
$importance = $newsData['data']['news'][0]['importanceScore'] ?? 1;
return $importance >= 8 ? 'high' : 'normal';
}
}
# Install ReactPHP WebSocket client
composer require react/socket
composer require pawl/ratchet-pawl
composer require react/zmq # Optional: for message queuing
# Alternative: ElephantIO for Socket.IO compatibility
composer require elephantio/elephant.io
<?php
// config/byul.php
return [
'api_key' => env('BYUL_API_KEY'),
'websocket_url' => 'wss://api.byul.ai/news-v2',
'default_filters' => [
'minImportance' => 6,
'limit' => 50
],
'retry_settings' => [
'max_retries' => 5,
'initial_delay' => 1,
'max_delay' => 30
]
];
# .env file
BYUL_API_KEY=byul_api_key
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
# Dockerfile for PHP WebSocket application
FROM php:8.1-cli
RUN apt-get update && apt-get install -y \
git \
zip \
unzip \
&& rm -rf /var/lib/apt/lists/*
RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer
WORKDIR /app
COPY composer.json ./
RUN composer install
COPY . .
CMD ["php", "websocket_client.php"]