1: <?php
2:
3: namespace Alchemy\orm;
4: use Alchemy\core\query\Insert;
5: use Alchemy\core\query\Query;
6: use Alchemy\engine\IEngine;
7: use Alchemy\engine\ResultSet;
8: use Alchemy\util\Monad;
9: use Alchemy\util\promise\Promise;
10:
11:
12: /**
13: * Queue for scheduling queries to be run later, in
14: * the proper order
15: */
16: class WorkQueue {
17: protected $queue = array();
18:
19:
20: /**
21: * Delete data based on the given filters
22: *
23: * @param string $cls Class name of DataMapper subclass
24: * @param array $pk array(ColumnName => IQueryValue) UPDATE Filters
25: * @return Promise resolved when DELETE is actually run
26: */
27: public function delete($cls, $pk) {
28: $table = $cls::table();
29: $query = Query::Delete($table)
30: ->where($table->equal($pk));
31:
32: return $this->push($query);
33: }
34:
35:
36: /**
37: * Flush all pending queries to the database
38: *
39: * @param IEngine $engine
40: */
41: public function flush(IEngine $engine) {
42: while ($item = array_shift($this->queue)) {
43: list($query, $promise) = $item;
44: try {
45: $r = $engine->query($query);
46: } catch (\Exception $e) {
47: throw $e;
48: }
49: $promise->resolve($r);
50: }
51: }
52:
53:
54: /**
55: * INSERT data based on the given DataMapper class
56: * and a record of properties.
57: *
58: * @param string $cls Class name of DataMapper subclass
59: * @param array $data Array of properties to send in the INSERT
60: * @return Promise resolved when INSERT is actual run
61: */
62: public function insert($cls, array $data) {
63: $table = $cls::table();
64:
65: $scalars = array();
66: $columns = array();
67: foreach ($data as $name => $value) {
68: $columns[] = $table->$name;
69: $scalars[] = $table->{$name}->schema()->encode($value);
70: }
71:
72: $query = Query::Insert($table)
73: ->columns($columns)
74: ->row($scalars);
75:
76: return $this->push($query);
77: }
78:
79:
80: /**
81: * Push a query onto the end of the queue
82: *
83: * @param Query|Monad Query to push
84: * @return Promise resolved when query is actual run
85: */
86: protected function push($query) {
87: if ($query instanceof Monad) {
88: $query = $query->unwrap();
89: }
90:
91: $promise = new Promise(null, 'Alchemy\engine\ResultSet');
92: $this->queue[] = array($query, $promise);
93: return $promise;
94: }
95:
96:
97: /**
98: * UPDATE data based on the given DataMapper class, a
99: * record of properties to update, and an array keys
100: *
101: * @param string $cls Class name of DataMapper subclass
102: * @param array $pk array(ColumnName => IQueryValue) UPDATE Filters
103: * @param array $data Array of properties to send in the INSERT
104: * @return Promise resolved when INSERT is actually run
105: */
106: public function update($cls, $pk, $data) {
107: $table = $cls::table();
108: $query = Query::Update($table)
109: ->where($table->equal($pk))
110: ->columns($data);
111:
112: return $this->push($query);
113: }
114: }
115: