C:/lib/adodb/replicate/adodb-replicate.inc.php Quellcode

adodb-replicate.inc.php
gehe zur Dokumentation dieser Datei
1 <?php
2 
3 define('ADODB_REPLICATE',1.2);
4 
5 include_once(ADODB_DIR.'/adodb-datadict.inc.php');
6 
7 /*
8 1.2 9 June 2009
9 Minor patches
10 
11 1.1 8 June 2009
12 Added $lastUpdateFld to replicatedata
13 Added $rep->compat. If compat set to 1.0, then $lastUpdateFld not used during MergeData.
14 
15 1.0 Apr 2009
16 Added support for MFFA
17 
18 0.9 ? 2008
19 First release
20 
21 
22  Note: this code assumes that comments such as / * * / ar`e allowed which works with:
23  Note: this code assumes that comments such as / * * / are allowed which works with:
24  mssql, postgresql, oracle, mssql
25 
26  Replication engine to
27  - copy table structures and data from different databases (e.g. mysql to oracle)
28  for replication purposes
29  - generate CREATE TABLE, CREATE INDEX, INSERT ... for installation scripts
30 
31  Table Structure copying includes
32  - fields and limited subset of types
33  - optional default values
34  - indexes
35  - but not constraints
36 
37 
38  Two modes of data copy:
39 
40  ReplicateData
41  - Copy from src to dest, with update of status of copy back to src,
42  with configurable src SELECT where clause
43 
44  MergeData
45  - Copy from src to dest based on last mod date field and/or copied flag field
46 
47  Default settings are
48  - do not execute, generate sql ($rep->execute = false)
49  - do not delete records in dest table first ($rep->deleteFirst = false).
50  if $rep->deleteFirst is true and primary keys are defined,
51  then no deletion will occur unless *INSERTONLY* is defined in pkey array
52  - only commit once at the end of every ReplicateData ($rep->commitReplicate = true)
53  - do not autocommit every x records processed ($rep->commitRecs = -1)
54  - even if error occurs on one record, continue copying remaining records ($rep->neverAbort = true)
55  - debugging turned off ($rep->debug = false)
56 */
57 
59  var $connSrc;
60  var $connDest;
61 
62  var $connSrc2 = false;
63  var $connDest2 = false;
64  var $ddSrc;
65  var $ddDest;
66 
67  var $execute = false;
68  var $debug = false;
69  var $deleteFirst = false;
70  var $commitReplicate = true; // commit at end of replicatedata
71  var $commitRecs = -1; // only commit at end of ReplicateData()
72 
73  var $selFilter = false;
74  var $fieldFilter = false;
75  var $indexFilter = false;
76  var $updateFilter = false;
77  var $insertFilter = false;
78  var $updateSrcFn = false;
79 
80  var $limitRecs = false;
81 
82  var $neverAbort = true;
83  var $copyTableDefaults = false; // turn off because functions defined as defaults will not work when copied
84  var $errHandler = false; // name of error handler function, if used.
85  var $htmlSpecialChars = true; // if execute false, then output with htmlspecialchars enabled.
86  // Will autoconfigure itself. No need to modify
87 
88  var $trgSuffix = '_mrgTr';
89  var $idxSuffix = '_mrgidx';
90  var $trLogic = '1 = 1';
91  var $datesAreTimeStamps = false;
92 
93  var $oracleSequence = false;
94  var $readUncommitted = false; // read without obeying shared locks for fast select (mssql)
95 
96  var $compat = false;
97  // connSrc2 and connDest2 are only required if the db driver
98  // does not allow updates back to src db in first connection (the select connection),
99  // so we need 2nd connection
101  {
102 
103  if (strpos($connSrc->databaseType,'odbtp') !== false) {
104  $connSrc->_bindInputArray = false; # bug in odbtp, binding fails
105  }
106 
107  if (strpos($connDest->databaseType,'odbtp') !== false) {
108  $connDest->_bindInputArray = false; # bug in odbtp, binding fails
109  }
110 
111  $this->connSrc = $connSrc;
112  $this->connDest = $connDest;
113 
114  $this->connSrc2 = ($connSrc2) ? $connSrc2 : $connSrc;
115  $this->connDest2 = ($connDest2) ? $connDest2 : $connDest;
116 
117  $this->ddSrc = NewDataDictionary($connSrc);
118  $this->ddDest = NewDataDictionary($connDest);
119  $this->htmlSpecialChars = isset($_SERVER['HTTP_HOST']);
120  }
121 
122  function ExecSQL($sql)
123  {
124  if (!is_array($sql)) $sql[] = $sql;
125 
126  $ret = true;
127  foreach($sql as $s)
128  if (!$this->execute) echo "<pre>",$s.";\n</pre>";
129  else {
130  $ok = $this->connDest->Execute($s);
131  if (!$ok)
132  if ($this->neverAbort) $ret = false;
133  else return false;
134  }
135 
136  return $ret;
137  }
138 
139  /*
140  We assume replication between $table and $desttable only works if the field names and types match for both tables.
141 
142  Also $table and desttable can have different names.
143  */
144 
145  function CopyTableStruct($table,$desttable='')
146  {
147  $sql = $this->CopyTableStructSQL($table,$desttable);
148  if (empty($sql)) return false;
149  return $this->ExecSQL($sql);
150  }
151 
152  function RunFieldFilter(&$fld, $mode = '')
153  {
154  if ($this->fieldFilter) {
155  $fn = $this->fieldFilter;
156  return $fn($fld, $mode);
157  } else
158  return $fld;
159  }
160 
161  function RunUpdateFilter($table, $fld, $val)
162  {
163  if ($this->updateFilter) {
164  $fn = $this->updateFilter;
165  return $fn($table, $fld, $val);
166  } else
167  return $val;
168  }
169 
170  function RunInsertFilter($table, $fld, &$val)
171  {
172  if ($this->insertFilter) {
173  $fn = $this->insertFilter;
174  return $fn($table, $fld, $val);
175  } else
176  return $fld;
177  }
178 
179  /*
180  $mode = INS or UPD
181 
182  The lastUpdateFld holds the field that counts the number of updates or the date of last mod. This ensures that
183  if the rec was modified after replicatedata retrieves the data but before we update back the src record,
184  we don't set the copiedflag='Y' yet.
185  */
186  function RunUpdateSrcFn($srcdb, $table, $fldoffsets, $row, $where, $mode, $dest_insertid=null, $lastUpdateFld='')
187  {
188  if (!$this->updateSrcFn) return;
189 
190  $bindarr = array();
191  foreach($fldoffsets as $k) {
192  $bindarr[$k] = $row[$k];
193  }
194  $last = sizeof($row);
195 
196  if ($lastUpdateFld && $row[$last-1]) {
197  $ds = $row[$last-1];
198  if (strpos($ds,':') !== false) $s = $srcdb->DBTimeStamp($ds);
199  else $s = $srcdb->qstr($ds);
200  $where = "WHERE $lastUpdateFld = $s and $where";
201  } else
202  $where = "WHERE $where";
203  $fn = $this->updateSrcFn;
204  if (is_array($fn)) {
205  if (sizeof($fn) == 1) $set = reset($fn);
206  else $set = @$fn[$mode];
207  if ($set) {
208 
209  if (strlen($dest_insertid) == 0) $dest_insertid = 'null';
210  $set = str_replace('$INSERT_ID',$dest_insertid,$set);
211 
212  $sql = "UPDATE $table SET $set $where ";
213  $ok = $srcdb->Execute($sql,$bindarr);
214  if (!$ok) {
215  echo $srcdb->ErrorMsg(),"<br>\n";
216  die();
217  }
218  }
219  } else $fn($srcdb, $table, $row, $where, $bindarr, $mode, $dest_insertid);
220 
221  }
222 
223  function CopyTableStructSQL($table, $desttable='',$dropdest =false)
224  {
225  if (!$desttable) {
226  $desttable = $table;
227  $prefixidx = '';
228  } else
229  $prefixidx = $desttable;
230 
231  $conn = $this->connSrc;
232  $types = $conn->MetaColumns($table);
233  if (!$types) {
234  echo "$table does not exist in source db<br>\n";
235  return array();
236  }
237  if (!$dropdest && $this->connDest->MetaColumns($desttable)) {
238  echo "$desttable already exists in dest db<br>\n";
239  return array();
240  }
241  if ($this->debug) var_dump($types);
242  $sa = array();
243  $idxcols = array();
244 
245  foreach($types as $name => $t) {
246  $s = '';
247  $mt = $this->ddSrc->MetaType($t->type);
248  $len = $t->max_length;
249  $fldname = $this->RunFieldFilter($t->name,'TABLE');
250  if (!$fldname) continue;
251 
252  $s .= $fldname . ' '.$mt;
253  if (isset($t->scale)) $precision = '.'.$t->scale;
254  else $precision = '';
255  if ($mt == 'C' or $mt == 'X') $s .= "($len)";
256  else if ($mt == 'N' && $precision) $s .= "($len$precision)";
257 
258  if ($mt == 'R') $idxcols[] = $fldname;
259 
260  if ($this->copyTableDefaults) {
261  if (isset($t->default_value)) {
262  $v = $t->default_value;
263  if ($mt == 'C' or $mt == 'X') $v = $this->connDest->qstr($v); // might not work as this could be function
264  $s .= ' DEFAULT '.$v;
265  }
266  }
267 
268  $sa[] = $s;
269  }
270 
271  $s = implode(",\n",$sa);
272 
273  // dump adodb intermediate data dictionary format
274  if ($this->debug) echo '<pre>'.$s.'</pre>';
275 
276  $sqla = $this->ddDest->CreateTableSQL($desttable,$s);
277 
278  /*
279  if ($idxcols) {
280  $idxoptions = array('UNIQUE'=>1);
281  $sqla2 = $this->ddDest->_IndexSQL($table.'_'.$fldname.'_SERIAL', $desttable, $idxcols,$idxoptions);
282  $sqla = array_merge($sqla,$sqla2);
283  }*/
284 
285  $idxs = $conn->MetaIndexes($table);
286  if ($idxs)
287  foreach($idxs as $name => $iarr) {
288  $idxoptions = array();
289  $fldnames = array();
290 
291  if(!empty($iarr['unique'])) {
292  $idxoptions['UNIQUE'] = 1;
293  }
294 
295  foreach($iarr['columns'] as $fld) {
296  $fldnames[] = $this->RunFieldFilter($fld,'TABLE');
297  }
298 
299  $idxname = $prefixidx.str_replace($table,$desttable,$name);
300 
301  if (!empty($this->indexFilter)) {
302  $fn = $this->indexFilter;
303  $idxname = $fn($desttable,$idxname,$fldnames,$idxoptions);
304  }
305  $sqla2 = $this->ddDest->_IndexSQL($idxname, $desttable, $fldnames,$idxoptions);
306  $sqla = array_merge($sqla,$sqla2);
307  }
308 
309  return $sqla;
310  }
311 
312  function _clearcache()
313  {
314 
315  }
316 
317  function _concat($v)
318  {
319  return $this->connDest->concat("' ","chr(".ord($v).")","'");
320  }
321 
322  function fixupbinary($v)
323  {
324  return str_replace(
325  array("\r","\n"),
326  array($this->_concat("\r"),$this->_concat("\n")),
327  $v );
328  }
329 
330  function SwapDBs()
331  {
332  $o = $this->connSrc;
333  $this->connSrc = $this->connDest;
334  $this->connDest = $o;
335 
336 
337  $o = $this->connSrc2;
338  $this->connSrc2 = $this->connDest2;
339  $this->connDest2 = $o;
340 
341  $o = $this->ddSrc;
342  $this->ddSrc = $this->ddDest;
343  $this->ddDest = $o;
344  }
345 
346  /*
347  // if no uniqflds defined, then all desttable recs will be deleted before insert
348  // $where clause must include the WHERE word if used
349  // if $this->commitRecs is set to a +ve value, then it will autocommit every $this->commitRecs records
350  // -- this should never be done with 7x24 db's
351 
352  Returns an array:
353  $arr[0] = true if no error, false if error
354  $arr[1] = number of recs processed
355  $arr[2] = number of successful inserts
356  $arr[3] = number of successful updates
357 
358  ReplicateData() params:
359 
360  $table = src table name
361  $desttable = dest table name, leave blank to use src table name
362  $uniqflds = array() = an array. If set, then inserts and updates will occur. eg. array('PK1', 'PK2');
363  To prevent updates to desttable (allow only to src table), add '*INSERTONLY*' or '*ONLYINSERT*' to array.
364 
365  Sometimes you are replicating a src table with an autoinc primary key.
366  You sometimes create recs in the dest table. The dest table has to retrieve the
367  src table's autoinc key (stored in a 2nd field) so you can match the two tables.
368 
369  To define this, and the uniqflds contains nested arrays. Copying from autoinc table to other table:
370  array(array($destpkey), array($destfld_holds_src_autoinc_pkey))
371 
372  Copying from normal table to autoinc table:
373  array(array($destpkey), array(), array($srcfld_holds_dest_autoinc_pkey))
374 
375  $where = where clause for SELECT from $table $where. Include the WHERE reserved word in beginning.
376  You can put ORDER BY at the end also
377  $ignoreflds = array(), list of fields to ignore. e.g. array('FLD1',FLD2');
378  $dstCopyDateFld = date field on $desttable to update with current date
379  $extraflds allows you to add additional flds to insert/update. Format
380  array(fldname => $fldval)
381  $fldval itself can be an array or a string. If an array, then
382  $extraflds = array($fldname => array($insertval, $updateval))
383 
384  Thus we have the following behaviours:
385 
386  a. Delete all data in $desttable then insert from src $table
387 
388  $rep->execute = true;
389  $rep->ReplicateData($table, $desttable)
390 
391  b. Update $desttable if record exists (based on $uniqflds), otherwise insert.
392 
393  $rep->execute = true;
394  $rep->ReplicateData($table, $desttable, $array($pkey1, $pkey2))
395 
396  c. Select from src $table all data modified since a date. Then update $desttable
397  if record exists (based on $uniqflds), otherwise insert
398 
399  $rep->execute = true;
400  $rep->ReplicateData($table, $desttable, array($pkey1, $pkey2), "WHERE update_datetime_fld > $LAST_REFRESH")
401 
402  d. Insert all records into $desttable modified after a certain id (or time) in src $table:
403 
404  $rep->execute = true;
405  $rep->ReplicateData($table, $desttable, false, "WHERE id_fld > $LAST_ID_SAVED", true);
406 
407 
408  For (a) to (d), returns array: array($boolean_ok_fail, $no_recs_selected_from_src_db, $no_recs_inserted, $no_recs_updated);
409 
410  e. Generate sample SQL:
411 
412  $rep->execute = false;
413  $rep->ReplicateData(....);
414 
415  This returns $array, which contains:
416 
417  $array['SEL'] = select stmt from src db
418  $array['UPD'] = update stmt to dest db
419  $array['INS'] = insert stmt to dest db
420 
421 
422  Error-handling
423  ==============
424  Default is never abort if error occurs. You can set $rep->neverAbort = false; to force replication to abort if an error occurs.
425 
426 
427  Value Filtering
428  ========
429  Sometimes you might need to modify/massage the data before the code works. Assume that the value used for True and False is
430  'T' and 'F' in src DB, but is 'Y' and 'N' in dest DB for field[2] in select stmt. You can do this by
431 
432  $rep->filterSelect = 'filter';
433  $rep->ReplicateData(...);
434 
435  function filter($table,& $fields, $deleteFirst)
436  {
437  if ($table == 'SOMETABLE') {
438  if ($fields[2] == 'T') $fields[2] = 'Y';
439  else if ($fields[2] == 'F') $fields[2] = 'N';
440  }
441  }
442 
443  We pass in $deleteFirst as that determines the order of the fields (which are numeric-based):
444  TRUE: the order of fields matches the src table order
445  FALSE: the order of fields is all non-primary key fields first, followed by primary key fields. This is because it needs
446  to match the UPDATE statement, which is UPDATE $table SET f2 = ?, f3 = ? ... WHERE f1 = ?
447 
448  Name Filtering
449  =========
450  Sometimes field names that are legal in one RDBMS can be illegal in another.
451  We allow you to handle this using a field filter.
452  Also if you don't want to replicate certain fields, just return false.
453 
454  $rep->fieldFilter = 'ffilter';
455 
456  function ffilter(&$fld,$mode)
457  {
458  $uf = strtoupper($fld);
459  switch($uf) {
460  case 'GROUP':
461  if ($mode == 'SELECT') $fld = '"Group"';
462  return 'GroupFld';
463 
464  case 'PRIVATEFLD': # do not replicate
465  return false;
466  }
467  return $fld;
468  }
469 
470 
471  UPDATE FILTERING
472  ================
473  Sometimes, when we want to update
474  UPDATE table SET fld = val WHERE ....
475 
476  we want to modify val. To do so, define
477 
478  $rep->updateFilter = 'ufilter';
479 
480  function ufilter($table, $fld, $val)
481  {
482  return "nvl($fld, $val)";
483  }
484 
485 
486  Sending back audit info back to src Table
487  =========================================
488 
489  Use $rep->updateSrcFn. This can be an array of strings, or the name of a php function to call.
490 
491  If an array of strings is defined, then it will perform an update statement...
492 
493  UPDATE srctable SET $string WHERE ....
494 
495  With $string set to the array you define. If a new record was inserted into desttable, then the
496  'INS' string is used ($INSERT_ID will be replaced with the real INSERT_ID, if any),
497  and if an update then use the 'UPD' string.
498 
499  array(
500  'INS' => 'insertid = $INSERT_ID, copieddate=getdate(), copied = 1',
501  'UPD' => 'copieddate=getdate(), copied = 1'
502  )
503 
504  If a single string array is defined, then it will be used for both insert and update.
505  array('copieddate=getdate(), copied = 1')
506 
507  Note that the where clause is automatically defined by the system.
508 
509  If $rep->updateSrcFn is a PHP function name, then it will be called with the following params:
510 
511  $fn($srcConnection, $tableName, $row, $where, $bindarr, $mode, $dest_insertid)
512 
513  $srcConnection - source db connection
514  $tableName - source tablename
515  $row - array holding records updated into dest
516  $where - where clause to be used (uses bind vars)
517  $bindarr - array holding bind variables for where clause
518  $mode - INS or UPD
519  $dest_insertid - when mode=INS, then the insert_id is stored here.
520 
521 
522  oracle mssql
523  ---> insert
524  mssqlid <--- insert_id
525  ----> update with mssqlid
526  <---- update with mssqlid
527 
528 
529  TODO: add src pkey and dest pkey for updates. Also sql stmt needs to be tuned, so dest pkey, src pkey
530  */
531 
532 
533  function ReplicateData($table, $desttable = '', $uniqflds = array(), $where = '',$ignore_flds = array(),
534  $dstCopyDateFld='', $extraflds = array(), $lastUpdateFld = '')
535  {
536  if (is_array($where)) {
537  $wheresrc = $where[0];
538  $wheredest = $where[1];
539  } else {
540  $wheresrc = $wheredest = $where;
541  }
542  $dstCopyDateName = $dstCopyDateFld;
543  $dstCopyDateFld = strtoupper($dstCopyDateFld);
544 
545  $this->_clearcache();
546  if (is_string($uniqflds) && strlen($uniqflds)) $uniqflds = array($uniqflds);
547  if (!$desttable) $desttable = $table;
548 
549  $uniq = array();
550  if ($uniqflds) {
551  if (is_array(reset($uniqflds))) {
552  /*
553  primary key of src and dest tables differ. This means when we perform the select stmts
554  we retrieve both keys. Then any insert statement will have to ignore one array element.
555  Any update statement will need to use a different where clause
556  */
557  $destuniqflds = $uniqflds[0];
558  if (sizeof($uniqflds)>1 && $uniqflds[1]) // srckey field name in dest table
559  $srcuniqflds = $uniqflds[1];
560  else
561  $srcuniqflds = array();
562 
563  if (sizeof($uniqflds)>2)
564  $srcPKDest = reset($uniqflds[2]);
565 
566  } else {
567  $destuniqflds = $uniqflds;
568  $srcuniqflds = array();
569  }
570  $onlyInsert = false;
571  foreach($destuniqflds as $k => $u) {
572  if ($u == '*INSERTONLY*' || $u == '*ONLYINSERT*') {
573  $onlyInsert = true;
574  continue;
575  }
576  $uniq[strtoupper($u)] = $k;
577  }
579  } else {
580  $deleteFirst = true;
581  }
582 
583  if ($deleteFirst) $onlyInsert = true;
584 
585  if ($ignore_flds) {
586  foreach($ignore_flds as $u) {
587  $ignoreflds[strtoupper($u)] = 1;
588  }
589  } else
590  $ignoreflds = array();
591 
592  $src = $this->connSrc;
593  $dest = $this->connDest;
594  $src2 = $this->connSrc2;
595 
596  $dest->noNullStrings = false;
597  $src->noNullStrings = false;
598  $src2->noNullStrings = false;
599 
600  if ($src === $dest) $this->execute = false;
601 
602  $types = $src->MetaColumns($table);
603  if (!$types) {
604  echo "Source $table does not exist<br>\n";
605  return array();
606  }
607  $dtypes = $dest->MetaColumns($desttable);
608  if (!$dtypes) {
609  echo "Destination $desttable does not exist<br>\n";
610  return array();
611  }
612  $sa = array();
613  $selflds = array();
614  $wheref = array();
615  $wheres = array();
616  $srcwheref = array();
617  $fldoffsets = array();
618  $k = 0;
619  foreach($types as $name => $t) {
620  $name2 = strtoupper($this->RunFieldFilter($name,'SELECT'));
621  // handle quotes
622  if ($name2 && $name2[0] == '"' && $name2[strlen($name2)-1] == '"') $name22 = substr($name2,1,strlen($name2)-2);
623  elseif ($name2 && $name2[0] == '`' && $name2[strlen($name2)-1] == '`') $name22 = substr($name2,1,strlen($name2)-2);
624  else $name22 = $name2;
625 
626  //else $name22 = $name2; // this causes problem for quotes strip above
627 
628  if (!isset($dtypes[($name22)]) || !$name2) {
629  if ($this->debug) echo " Skipping $name ==> $name2 as not in destination $desttable<br>";
630  continue;
631  }
632 
633  if ($name2 == $dstCopyDateFld) {
634  $dstCopyDateName = $t->name;
635  continue;
636  }
637 
638  $fld = $t->name;
639  $fldval = $t->name;
640  $mt = $src->MetaType($t->type);
641  if ($this->datesAreTimeStamps && $mt == 'D') $mt = 'T';
642  if ($mt == 'D') $fldval = $dest->DBDate($fldval);
643  elseif ($mt == 'T') $fldval = $dest->DBTimeStamp($fldval);
644  $ufld = strtoupper($fld);
645 
646  if (isset($ignoreflds[($name2)]) && !isset($uniq[$ufld])) {
647  continue;
648  }
649 
650  if ($this->debug) echo " field=$fld type=$mt fldval=$fldval<br>";
651 
652  if (!isset($uniq[$ufld])) {
653 
654  $selfld = $fld;
655  $fld = $this->RunFieldFilter($selfld,'SELECT');
656  $selflds[] = $selfld;
657 
658  $p = $dest->Param($k);
659 
660  if ($mt == 'D') $p = $dest->DBDate($p, true);
661  else if ($mt == 'T') $p = $dest->DBTimeStamp($p, true);
662 
663  # UPDATES
664  $sets[] = "$fld = ".$this->RunUpdateFilter($desttable, $fld, $p);
665 
666  # INSERTS
667  $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $p;
668  $k++;
669  } else {
670  $fld = $this->RunFieldFilter($fld);
671  $wheref[] = $fld;
672  if (!empty($srcuniqflds)) $srcwheref[] = $srcuniqflds[$uniq[$ufld]];
673  if ($mt == 'C') { # normally we don't include the primary key in the insert if it is numeric, but ok if varchar
674  $insertpkey = true;
675  }
676  }
677  }
678 
679 
680  foreach($extraflds as $fld => $evals) {
681  if (!is_array($evals)) $evals = array($evals, $evals);
682  $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $evals[0];
683  $sets[] = "$fld = ".$evals[1];
684  }
685 
686  if ($dstCopyDateFld) {
687  $sets[] = "$dstCopyDateName = ".$dest->sysTimeStamp;
688  $insflds[] = $this->RunInsertFilter($desttable,$dstCopyDateName, $p); $params[] = $dest->sysTimeStamp;
689  }
690 
691 
692  if (!empty($srcPKDest)) {
693  $selflds[] = $srcPKDest;
694  $fldoffsets = array($k+1);
695  }
696 
697  foreach($wheref as $uu => $fld) {
698 
699  $p = $dest->Param($k);
700  $sp = $src->Param($k);
701  if (!empty($srcuniqflds)) {
702  if ($uu > 1) die("Only one primary key for srcuniqflds allowed currently");
703  $destsrckey = reset($srcuniqflds);
704  $wheres[] = reset($srcuniqflds).' = '.$p;
705 
706  $insflds[] = $this->RunInsertFilter($desttable,$destsrckey, $p);
707  $params[] = $p;
708  } else {
709  $wheres[] = $fld.' = '.$p;
710  if (!isset($ignoreflds[strtoupper($fld)]) || !empty($insertpkey)) {
711  $insflds[] = $this->RunInsertFilter($desttable,$fld, $p);
712  $params[] = $p;
713  }
714  }
715 
716  $selflds[] = $fld;
717  $srcwheres[] = $fld.' = '.$sp;
718  $fldoffsets[] = $k;
719 
720  $k++;
721  }
722 
723  if (!empty($srcPKDest)) {
724  $fldoffsets = array($k);
725  $srcwheres = array($fld.'='.$src->Param($k));
726  $k++;
727  }
728 
729  if ($lastUpdateFld) {
730  $selflds[] = $lastUpdateFld;
731  } else
732  $selflds[] = 'null as Z55_DUMMY_LA5TUPD';
733 
734  $insfldss = implode(', ', $insflds);
735  $fldss = implode(', ', $selflds);
736  $setss = implode(', ', $sets);
737  $paramss = implode(', ', $params);
738  $wheress = implode(' AND ', $wheres);
739  if (isset($srcwheres))
740  $srcwheress = implode(' AND ',$srcwheres);
741 
742 
743  $seltable = $table;
744  if ($this->readUncommitted && strpos($src->databaseType,'mssql')) $seltable .= ' with (NOLOCK)';
745 
746  $sa['SEL'] = "SELECT $fldss FROM $seltable $wheresrc";
747  $sa['INS'] = "INSERT INTO $desttable ($insfldss) VALUES ($paramss) /**INS**/";
748  $sa['UPD'] = "UPDATE $desttable SET $setss WHERE $wheress /**UPD**/";
749 
750 
751 
752  $DB1 = "/* <font color=green> Source DB - sample sql in case you need to adapt code\n\n";
753  $DB2 = "/* <font color=green> Dest DB - sample sql in case you need to adapt code\n\n";
754 
755  if (!$this->execute) echo '/*<style>
756 pre {
757 white-space: pre-wrap; /* css-3 */
758 white-space: -moz-pre-wrap !important; /* Mozilla, since 1999 */
759 white-space: -pre-wrap; /* Opera 4-6 */
760 white-space: -o-pre-wrap; /* Opera 7 */
761 word-wrap: break-word; /* Internet Explorer 5.5+ */
762 }
763 </style><pre>*/
764 ';
765  if ($deleteFirst && $this->deleteFirst) {
766  $where = preg_replace('/[ \n\r\t]+order[ \n\r\t]+by.*$/i', '', $where);
767  $sql = "DELETE FROM $desttable $wheredest\n";
768  if (!$this->execute) echo $DB2,'</font>*/',$sql,"\n";
769  else $dest->Execute($sql);
770  }
771 
772  global $ADODB_COUNTRECS;
773  $err = false;
774  $savemode = $src->setFetchMode(ADODB_FETCH_NUM);
775  $ADODB_COUNTRECS = false;
776 
777  if (!$this->execute) {
778  echo $DB1,$sa['SEL'],"</font>\n*/\n\n";
779  echo $DB2,$sa['INS'],"</font>\n*/\n\n";
780  $suffix = ($onlyInsert) ? ' PRIMKEY=?' : '';
781  echo $DB2,$sa['UPD'],"$suffix</font>\n*/\n\n";
782 
783  $rs = $src->Execute($sa['SEL']);
784  $cnt = 1;
785  $upd = 0;
786  $ins = 0;
787 
788  $sqlarr = explode('?',$sa['INS']);
789  $nparams = sizeof($sqlarr)-1;
790 
791  $useQmark = $dest && ($dest->dataProvider != 'oci8');
792 
793  while ($rs && !$rs->EOF) {
794  if ($useQmark) {
795  $sql = ''; $i = 0;
796  $arr = array_reverse($rs->fields);
797  //Use each() instead of foreach to reduce memory usage -mikefedyk
798  while(list(, $v) = each($arr)) {
799  $sql .= $sqlarr[$i];
800  // from Ron Baldwin <ron.baldwin#sourceprose.com>
801  // Only quote string types
802  $typ = gettype($v);
803  if ($typ == 'string')
804  //New memory copy of input created here -mikefedyk
805  $sql .= $dest->qstr($v);
806  else if ($typ == 'double')
807  $sql .= str_replace(',','.',$v); // locales fix so 1.1 does not get converted to 1,1
808  else if ($typ == 'boolean')
809  $sql .= $v ? $dest->true : $dest->false;
810  else if ($typ == 'object') {
811  if (method_exists($v, '__toString')) $sql .= $dest->qstr($v->__toString());
812  else $sql .= $dest->qstr((string) $v);
813  } else if ($v === null)
814  $sql .= 'NULL';
815  else
816  $sql .= $v;
817  $i += 1;
818 
819  if ($i == $nparams) break;
820  } // while
821 
822  if (isset($sqlarr[$i])) {
823  $sql .= $sqlarr[$i];
824  }
825  $INS = $sql;
826  } else {
827  $INS = $sa['INS'];
828  $arr = array_reverse($rs->fields);
829  foreach($arr as $k => $v) { // only works on oracle currently
830  $k = sizeof($arr)-$k-1;
831  $v = str_replace(":","%~%COLON%!%",$v);
832  $INS = str_replace(':'.$k,$this->fixupbinary($dest->qstr($v)),$INS);
833  }
834  $INS = str_replace("%~%COLON%!%",":",$INS);
835  if ($this->htmlSpecialChars) $INS = htmlspecialchars($INS);
836  }
837  echo "-- $cnt\n",$INS,";\n\n";
838  $cnt += 1;
839  $ins += 1;
840  $rs->MoveNext();
841  }
842  $src->setFetchMode($savemode);
843  return $sa;
844  } else {
845  $saved = $src->debug;
846  #$src->debug=1;
847  if ($this->limitRecs>100)
848  $rs = $src->SelectLimit($sa['SEL'],$this->limitRecs);
849  else
850  $rs = $src->Execute($sa['SEL']);
851  $src->debug = $saved;
852  if (!$rs) {
853  if ($this->errHandler) $this->_doerr('SEL',array());
854  return array(0,0,0,0);
855  }
856 
857 
858  if ($this->commitReplicate || $commitRecs > 0) {
859  $dest->BeginTrans();
860  if ($this->updateSrcFn) $src2->BeginTrans();
861  }
862 
863  if ($this->updateSrcFn && strpos($src2->databaseType,'mssql') !== false) {
864  # problem is writers interfere with readers in mssql
865  $rs = $src->_rs2rs($rs);
866  }
867  $cnt = 0;
868  $upd = 0;
869  $ins = 0;
870 
871  $sizeofrow = sizeof($selflds);
872 
873  $fn = $this->selFilter;
874  $commitRecs = $this->commitRecs;
875 
876  $saved = $dest->debug;
877 
878  if ($this->deleteFirst) $onlyInsert = true;
879  while ($origrow = $rs->FetchRow()) {
880 
881  if ($dest->debug) {flush(); @ob_flush();}
882 
883  if ($fn) {
884  if (!$fn($desttable, $origrow, $deleteFirst, $this, $selflds)) continue;
885  }
886  $doinsert = true;
887  $row = array_slice($origrow,0,$sizeofrow-1);
888 
889  if (!$onlyInsert) {
890  $doinsert = false;
891  $upderr = false;
892 
893  if (isset($srcPKDest)) {
894  if (is_null($origrow[$sizeofrow-3])) {
895  $doinsert = true;
896  $upderr = true;
897  }
898  }
899  if (!$upderr && !$dest->Execute($sa['UPD'],$row)) {
900  $err = true;
901  $upderr = true;
902  if ($this->errHandler) $this->_doerr('UPD',$row);
903  if (!$this->neverAbort) break;
904  }
905 
906  if ($upderr || $dest->Affected_Rows() == 0) {
907  $doinsert = true;
908  } else {
909  if (!empty($uniqflds)) $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'UPD', null, $lastUpdateFld);
910  $upd += 1;
911  }
912  }
913 
914  if ($doinsert) {
915  $inserr = false;
916  if (isset($srcPKDest)) {
917  $row = array_slice($origrow,0,$sizeofrow-2);
918  }
919 
920  if (! $dest->Execute($sa['INS'],$row)) {
921  $err = true;
922  $inserr = true;
923  if ($this->errHandler) $this->_doerr('INS',$row);
924  if ($this->neverAbort) continue;
925  else break;
926  } else {
927  if ($dest->dataProvider == 'oci8') {
928  if ($this->oracleSequence) $lastid = $dest->GetOne("select ".$this->oracleSequence.".currVal from dual");
929  else $lastid = 'null';
930  } else {
931  $lastid = $dest->Insert_ID();
932  }
933 
934  if (!$inserr && !empty($uniqflds)) {
935  $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'INS', $lastid,$lastUpdateFld);
936  }
937  $ins += 1;
938  }
939  }
940  $cnt += 1;
941 
942  if ($commitRecs > 0 && ($cnt % $commitRecs) == 0) {
943  $dest->CommitTrans();
944  $dest->BeginTrans();
945 
946  if ($this->updateSrcFn) {
947  $src2->CommitTrans();
948  $src2->BeginTrans();
949  }
950  }
951 
952  } // while
953 
954 
955  if ($this->commitReplicate || $commitRecs > 0) {
956  if (!$this->neverAbort && $err) {
957  $dest->RollbackTrans();
958  if ($this->updateSrcFn) $src2->RollbackTrans();
959  } else {
960  $dest->CommitTrans();
961  if ($this->updateSrcFn) $src2->CommitTrans();
962  }
963  }
964  }
965  if ($cnt != $ins + $upd) echo "<p>ERROR: $cnt != INS $ins + UPD $upd</p>";
966  $src->setFetchMode($savemode);
967  return array(!$err, $cnt, $ins, $upd);
968  }
969  // trigger support only for sql server and oracle
970  // need to add
971  function MergeSrcSetup($srcTable, $pkeys, $srcUpdateDateFld, $srcCopyDateFld, $srcCopyFlagFld,
972  $srcCopyFlagType='C(1)', $srcCopyFlagVals = array('Y','N','P','='))
973  {
974  $sqla = array();
975  $src = $this->connSrc;
976  $idx = $srcTable.'_mrgIdx';
977  $cols = $src->MetaColumns($srcTable);
978  #adodb_pr($cols);
979  if (!isset($cols[strtoupper($srcUpdateDateFld)])) {
980  $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcUpdateDateFld TS DEFTIMESTAMP");
981  foreach($sqla as $sql) $src->Execute($sql);
982  }
983 
984  if ($srcCopyDateFld && !isset($cols[strtoupper($srcCopyDateFld)])) {
985  $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyDateFld TS DEFTIMESTAMP");
986  foreach($sqla as $sql) $src->Execute($sql);
987  }
988 
989  $sysdate = $src->sysTimeStamp;
990  $arrv0 = $src->qstr($srcCopyFlagVals[0]);
991  $arrv1 = $src->qstr($srcCopyFlagVals[1]);
992  $arrv2 = $src->qstr($srcCopyFlagVals[2]);
993  $arrv3 = $src->qstr($srcCopyFlagVals[3]);
994 
995  if ($srcCopyFlagFld && !isset($cols[strtoupper($srcCopyFlagFld)])) {
996  $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyFlagFld $srcCopyFlagType DEFAULT $arrv1");
997  foreach($sqla as $sql) $src->Execute($sql);
998  }
999 
1000  $sqla = array();
1001 
1002 
1003  $name = "{$srcTable}_mrgTr";
1004  if (is_array($pkeys) && strpos($src->databaseType,'mssql') !== false) {
1005  $pk = reset($pkeys);
1006 
1007  #$sqla[] = "DROP TRIGGER $name";
1008  $sqltr = "
1009  TRIGGER $name
1010  ON $srcTable /* for data replication and merge */
1011  AFTER UPDATE
1012  AS
1013  UPDATE $srcTable
1014  SET
1015  $srcUpdateDateFld = case when I.$srcCopyFlagFld = $arrv2 or I.$srcCopyFlagFld = $arrv3 then I.$srcUpdateDateFld
1016  else $sysdate end,
1017  $srcCopyFlagFld = case
1018  when I.$srcCopyFlagFld = $arrv2 then $arrv0
1019  when I.$srcCopyFlagFld = $arrv3 then D.$srcCopyFlagFld
1020  else $arrv1 end
1021  FROM $srcTable S Join Inserted AS I on I.$pk = S.$pk
1022  JOIN Deleted as D ON I.$pk = D.$pk
1023  WHERE I.$srcCopyFlagFld = D.$srcCopyFlagFld or I.$srcCopyFlagFld = $arrv2
1024  or I.$srcCopyFlagFld = $arrv3 or I.$srcCopyFlagFld is null
1025  ";
1026  $sqla[] = 'CREATE '.$sqltr; // first if does not exists
1027  $sqla[] = 'ALTER '.$sqltr; // second if it already exists
1028  } else if (strpos($src->databaseType,'oci') !== false) {
1029 
1030  if (strlen($srcTable)>22) $tableidx = substr($srcTable,0,16).substr(crc32($srcTable),6);
1031  else $tableidx = $srcTable;
1032 
1033  $name = $tableidx.$this->trgSuffix;
1034  $idx = $tableidx.$this->idxSuffix;
1035  $sqla[] = "
1036 CREATE OR REPLACE TRIGGER $name /* for data replication and merge */
1037 BEFORE UPDATE ON $srcTable REFERENCING NEW AS NEW OLD AS OLD
1038 FOR EACH ROW
1039 BEGIN
1040  if :new.$srcCopyFlagFld = $arrv2 then
1041  :new.$srcCopyFlagFld := $arrv0;
1042  elsif :new.$srcCopyFlagFld = $arrv3 then
1043  :new.$srcCopyFlagFld := :old.$srcCopyFlagFld;
1044  elsif :old.$srcCopyFlagFld = :new.$srcCopyFlagFld or :new.$srcCopyFlagFld is null then
1045  if $this->trLogic then
1046  :new.$srcUpdateDateFld := $sysdate;
1047  :new.$srcCopyFlagFld := $arrv1;
1048  end if;
1049  end if;
1050 END;
1051 ";
1052  }
1053  foreach($sqla as $sql) $src->Execute($sql);
1054 
1055  if ($srcCopyFlagFld) $srcCopyFlagFld .= ', ';
1056  $src->Execute("CREATE INDEX {$idx} on $srcTable ($srcCopyFlagFld$srcUpdateDateFld)");
1057  }
1058 
1059 
1060  /*
1061  Perform Merge by copying all data modified from src to dest
1062  then update src copied flag if present.
1063 
1064  Returns array taken from ReplicateData:
1065 
1066  Returns an array:
1067  $arr[0] = true if no error, false if error
1068  $arr[1] = number of recs processed
1069  $arr[2] = number of successful inserts
1070  $arr[3] = number of successful updates
1071 
1072  $srcTable = src table
1073  $dstTable = dest table
1074  $pkeys = primary keys array. if empty, then only inserts will occur
1075  $srcignoreflds = ignore these flds (must be upper cased)
1076  $setsrc = updateSrcFn string
1077  $srcUpdateDateFld = field in src with the last update date
1078  $srcCopyFlagFld = false = optional field that holds the copied indicator
1079  $flagvals=array('Y','N','P','=') = array of values indicating array(copied, not copied).
1080  Null is assumed to mean not copied. The 3rd value 'P' indicates that we want to force 'Y', bypassing
1081  default trigger behaviour to reset the COPIED='N' when the record is replicated from other side.
1082  The last value '=' is don't change copyflag.
1083  $srcCopyDateFld = field that holds last copy date in src table, which will be updated on Merge()
1084  $dstCopyDateFld = field that holds last copy date in dst table, which will be updated on Merge()
1085  $defaultDestRaiseErrorFn = The adodb raiseErrorFn handler. Default is to not raise an error.
1086  Just output error message to stdout
1087 
1088  */
1089 
1090 
1091  function Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
1092  $srcUpdateDateFld,
1093  $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
1094  $srcCopyDateFld = false,
1095  $dstCopyDateFld = false,
1096  $whereClauses = '',
1097  $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
1098  $copyDoneFlagIdx = 3,
1099  $defaultDestRaiseErrorFn = '')
1100  {
1101  $src = $this->connSrc;
1102  $dest = $this->connDest;
1103 
1104  $time = $src->Time();
1105 
1106  $delfirst = $this->deleteFirst;
1107  $upd = $this->updateSrcFn;
1108 
1109  $this->deleteFirst = false;
1110  //$this->updateFirst = true;
1111 
1112  $srcignoreflds[] = $srcUpdateDateFld;
1113  $srcignoreflds[] = $srcCopyFlagFld;
1114  $srcignoreflds[] = $srcCopyDateFld;
1115 
1116  if (empty($whereClauses)) $whereClauses = '1=1';
1117  $where = " WHERE ($whereClauses) and ($srcCopyFlagFld = ".$src->qstr($flagvals[1]).')';
1118  if ($orderBy) $where .= ' '.$orderBy;
1119  else $where .= ' ORDER BY '.$srcUpdateDateFld;
1120 
1121  if ($setsrc) $set[] = $setsrc;
1122  else $set = array();
1123 
1124  if ($srcCopyFlagFld) $set[] = "$srcCopyFlagFld = ".$src->qstr($flagvals[2]);
1125  if ($srcCopyDateFld) $set[]= "$srcCopyDateFld = ".$src->sysTimeStamp;
1126  if ($set) $this->updateSrcFn = array(implode(', ',$set));
1127  else $this->updateSrcFn = '';
1128 
1129 
1130  $extra[$srcCopyFlagFld] = array($dest->qstr($flagvals[0]),$dest->qstr($flagvals[$copyDoneFlagIdx]));
1131 
1132  $saveraise = $dest->raiseErrorFn;
1133  $dest->raiseErrorFn = '';
1134 
1135  if ($this->compat && $this->compat == 1.0) $srcUpdateDateFld = '';
1136  $arr = $this->ReplicateData($srcTable, $dstTable, $pkeys, $where, $srcignoreflds,
1137  $dstCopyDateFld,$extra,$srcUpdateDateFld);
1138 
1139  $dest->raiseErrorFn = $saveraise;
1140 
1141  $this->updateSrcFn = $upd;
1142  $this->deleteFirst = $delfirst;
1143 
1144  return $arr;
1145  }
1146  /*
1147  If doing a 2 way merge, then call
1148  $rep->Merge()
1149  to save without modifying the COPIEDFLAG ('=').
1150 
1151  Then can the following to set the COPIEDFLAG to 'P' which forces the COPIEDFLAG = 'Y'
1152  $rep->MergeDone()
1153  */
1154 
1155  function MergeDone($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
1156  $srcUpdateDateFld,
1157  $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
1158  $srcCopyDateFld = false,
1159  $dstCopyDateFld = false,
1160  $whereClauses = '',
1161  $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
1162  $copyDoneFlagIdx = 2,
1163  $defaultDestRaiseErrorFn = '')
1164  {
1165  return $this->Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
1166  $srcUpdateDateFld,
1167  $srcCopyFlagFld, $flagvals,
1168  $srcCopyDateFld,
1169  $dstCopyDateFld,
1170  $whereClauses,
1171  $orderBy, # MUST INCLUDE THE "ORDER BY" suffix
1172  $copyDoneFlagIdx,
1173  $defaultDestRaiseErrorFn);
1174  }
1175 
1176  function _doerr($reason, $selflds)
1177  {
1178  $fn = $this->errHandler;
1179  if ($fn) $fn($this, $reason, $selflds); // set $this->neverAbort to true or false as required inside $fn
1180  }
1181 }
1182 
1183 ?>




Korrekturen, Hinweise und Ergänzungen

Bitte scheuen Sie sich nicht und melden Sie, was auf dieser Seite sachlich falsch oder irreführend ist, was ergänzt werden sollte, was fehlt usw. Dazu bitte oben aus dem Menü Seite den Eintrag Support Forum wählen. Es ist eine kostenlose Anmeldung erforderlich, um Anmerkungen zu posten. Unpassende Postings, Spam usw. werden kommentarlos entfernt.