/**
- Pushes an error message to the logger,when one is configured.
-
- @param string $message The error message.
- @return void
*/
protected function _error($message) {
if ($this->_config['logger']) {
$this->_config['logger']->error($message);
}
}
public function errors()
{
return $this->_config['logger'];
}
/**
- Writes a packet to the socket. Prior to writing to the socket will
- check for availability of the connection.
-
- @param string $data
- @return integer|boolean number of written bytes or
false on error.
*/
protected function _write($data) {
if (!$this->connected) {
$message = 'No connecting found while writing data to socket.';
throw new RuntimeException($message);
}
$data .= "rn";
return fwrite($this->_connection,$data,strlen($data));
}
/**
-
Reads a packet from the socket. Prior to reading from the socket
-
will check for availability of the connection.
-
-
@param integer $length Number of bytes to read.
-
@return string|boolean Data or false on error.
*/
protected function _read($length = null) {
if (!$this->connected) {
$message = 'No connection found while reading data from socket.';
throw new RuntimeException($message);
}
if ($length) {
if (feof($this->_connection)) {
return false;
}
$data = stream_get_contents($this->_connection,$length + 2);
$meta = stream_get_meta_data($this->_connection);
if ($meta['timed_out']) {
$message = 'Connection timed out while reading data from socket.';
throw new RuntimeException($message);
}
$packet = rtrim($data,"rn");
} else {
$packet = stream_get_line($this->_connection,16384,"rn");
}
return $packet;
}
/ Producer Commands /
/**
- The
put command is for any process that wants to insert a job into the queue.
-
- @param integer $pri Jobs with smaller priority values will be scheduled
- before jobs with larger priorities. The most urgent priority is
- 0; the least urgent priority is 4294967295.
- @param integer $delay Seconds to wait before putting the job in the
- ready queue. The job will be in the "delayed" state during this time.
- @param integer $ttr Time to run - Number of seconds to allow a worker to
- run this job. The minimum ttr is 1.
- @param string $data The job body.
- @return integer|boolean
false on error otherwise an integer indicating
- the job id.
*/
public function put($pri,$delay,$ttr,$data) {
$this->_write(sprintf("put %d %d %d %drn%s",$pri,strlen($data),$data));
$status = strtok($this->_read(),' ');
switch ($status) {
case 'INSERTED':
case 'BURIED':
return (integer) strtok(' '); // job id
case 'EXPECTED_CRLF':
case 'JOB_TOO_BIG':
default:
$this->_error($status);
return false;
}
}
/**
- The
use command is for producers. Subsequent put commands will put
- jobs into the tube specified by this command. If no use command has
- been issued,jobs will be put into the tube named
default .
-
- @param string $tube A name at most 200 bytes. It specifies the tube to
- use. If the tube does not exist,it will be created.
- @return string|boolean
false on error otherwise the name of the tube.
*/
public function useTube($tube) {
$this->_write(sprintf('use %s',$tube));
$status = strtok($this->_read(),' ');
switch ($status) {
case 'USING':
return strtok(' ');
default:
$this->_error($status);
return false;
}
}
/**
- Pause a tube delaying any new job in it being reserved for a given time.
-
- @param string $tube The name of the tube to pause.
- @param integer $delay Number of seconds to wait before reserving any more
- jobs from the queue.
- @return boolean
false on error otherwise true .
*/
public function pauseTube($tube,$delay) {
$this->_write(sprintf('pause-tube %s %d',$tube,$delay));
$status = strtok($this->_read(),' ');
switch ($status) {
case 'PAUSED':
return true;
case 'NOT_FOUND':
default:
$this->_error($status);
return false;
}
}
/ Worker Commands / (编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|