1 /*
2 * Copyright (C) 2010 Canonical, Ltd.
3 *
4 * This library is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License
6 * version 3.0 as published by the Free Software Foundation.
7 *
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License version 3.0 for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library. If not, see
15 * <http://www.gnu.org/licenses/>.
16 *
17 * Authored by:
18 * Mikkel Kamstrup Erlandsen <mikkel.kamstrup@canonical.com>
19 * Neil Jagdish Patel <neil.patel@canonical.com>
20 * Michal Hruby <michal.hruby@canonical.com>
21 */
22
23 /**
24 * SECTION:dee-shared-model
25 * @short_description: A #DeeModel that can synchronize with other
26 * #DeeSharedModel objects across D-Bus.
27 * @include: dee.h
28 *
29 * #DeeSharedModel is created with a name (usually namespaced and unique to
30 * your program(s)) which is used to locate other #DeeSharedModels created
31 * with the same name through D-Bus, and will keep synchronized with them.
32 *
33 * This allows to you build MVC programs with a sane model API, but have the
34 * controller (or multiple views) in a separate process.
35 *
36 * Before you modify the contents of the shared model it is important that
37 * you wait for the model to synchronize with its peers. The normal way to do
38 * this is to wait for the "notify::synchronized" signal.
39 *
40 */
41 #ifdef HAVE_CONFIG_H
42 #include <config.h>
43 #endif
44
45 #include <memory.h>
46 #include <time.h>
47 #include <unistd.h>
48
49 #include "dee-peer.h"
50 #include "dee-model.h"
51 #include "dee-proxy-model.h"
52 #include "dee-sequence-model.h"
53 #include "dee-shared-model.h"
54 #include "dee-serializable-model.h"
55 #include "dee-serializable.h"
56 #include "dee-marshal.h"
57 #include "trace-log.h"
58 #include "com.canonical.Dee.Model-xml.h"
59
60 static void dee_shared_model_serializable_iface_init (DeeSerializableIface *iface);
61
62 static void dee_shared_model_model_iface_init (DeeModelIface *iface);
63
64 G_DEFINE_TYPE_WITH_CODE (DeeSharedModel,
65 dee_shared_model,
66 DEE_TYPE_PROXY_MODEL,
67 G_IMPLEMENT_INTERFACE (DEE_TYPE_SERIALIZABLE,
68 dee_shared_model_serializable_iface_init)
69 G_IMPLEMENT_INTERFACE (DEE_TYPE_MODEL,
70 dee_shared_model_model_iface_init));
71
72 #define DEE_SHARED_MODEL_GET_PRIVATE(obj) \
73 (G_TYPE_INSTANCE_GET_PRIVATE(obj, DEE_TYPE_SHARED_MODEL, DeeSharedModelPrivate))
74
75 #define DBUS_TYPE_G_VALUE_ARRAY_ARRAY (dbus_g_type_get_collection ("GPtrArray", dbus_g_type_get_collection("GPtrArray", G_TYPE_VALUE)))
76
77 /**
78 * DeeSharedModelPrivate:
79 *
80 * Ignore this structure.
81 **/
82 struct _DeeSharedModelPrivate
83 {
84 DeePeer *swarm;
85 GSList *connections;
86 gchar *model_path;
87
88 guint64 last_committed_seqnum;
89 /* Buffer of DeeSharedModelRevisions that we keep in order to batch
90 * our DBus signals. The invariant is that all buffered revisions
91 * are of the same type */
92 GSList *revision_queue;
93 guint revision_queue_timeout_id;
94 guint acquisition_timer_id;
95 gulong swarm_leader_handler;
96 gulong connection_acquired_handler;
97 gulong connection_closed_handler;
98 GArray *connection_infos;
99
100 gboolean synchronized;
101 gboolean found_first_peer;
102 gboolean suppress_remote_signals;
103 };
104
105 typedef struct
106 {
107 /* The revision type is: ROWS_ADDED, ROWS_REMOVED, or ROWS_CHANGED */
108 guchar change_type;
109 guint32 pos;
110 guint64 seqnum;
111 GVariant **row;
112 DeeModel *model;
113 } DeeSharedModelRevision;
114
115 typedef struct
116 {
117 GDBusConnection *connection;
118 guint signal_subscription_id;
119 guint registration_id;
120 } DeeConnectionInfo;
121 /* Globals */
122 static GQuark dee_shared_model_error_quark = 0;
123
124 enum
125 {
126 PROP_0,
127 PROP_PEER,
128 PROP_SYNCHRONIZED,
129 };
130
131 typedef enum
132 {
133 CHANGE_TYPE_ADD = '\x00',
134 CHANGE_TYPE_REMOVE = '\x01',
135 CHANGE_TYPE_CHANGE = '\x02',
136 CHANGE_TYPE_CLEAR = '\x03',
137 } ChangeType;
138
139
140 enum
141 {
142 /* Public signal */
143 BEGIN_TRANSACTION,
144 END_TRANSACTION,
145
146 LAST_SIGNAL
147 };
148
149 static guint32 _signals[LAST_SIGNAL] = { 0 };
150
151 /* Forwards */
152 static void on_connection_acquired (DeeSharedModel *self,
153 GDBusConnection *connection,
154 DeePeer *peer);
155
156 static void on_connection_closed (DeeSharedModel *self,
157 GDBusConnection *connection,
158 DeePeer *peer);
159
160 static void commit_transaction (DeeSharedModel *self,
161 const gchar *sender_name,
162 GVariant *transaction);
163
164 static void on_clone_received (GObject *source_object,
165 GAsyncResult *res,
166 gpointer user_data);
167
168 static void clone_leader (DeeSharedModel *self);
169
170 static void on_dbus_signal_received (GDBusConnection *connection,
171 const gchar *sender_name,
172 const gchar *object_path,
173 const gchar *interface_name,
174 const gchar *signal_name,
175 GVariant *parameters,
176 gpointer user_data);
177
178 static void on_leader_changed (DeeSharedModel *self);
179
180 static DeeSharedModelRevision*
181 dee_shared_model_revision_new (ChangeType type,
182 guint32 pos,
183 guint64 seqnum,
184 GVariant **row,
185 DeeModel *model);
186
187 static void dee_shared_model_revision_free (DeeSharedModelRevision *rev);
188
189 static gboolean flush_revision_queue_timeout_cb (DeeModel *self);
190 static guint flush_revision_queue (DeeModel *self);
191
192 static void enqueue_revision (DeeModel *self,
193 ChangeType type,
194 guint32 pos,
195 guint64 seqnum,
196 GVariant **row);
197
198 static void on_self_row_added (DeeModel *self,
199 DeeModelIter *iter);
200
201 static void on_self_row_removed (DeeModel *self,
202 DeeModelIter *iter);
203
204 static void on_self_row_changed (DeeModel *self,
205 DeeModelIter *iter);
206
207 static void reset_model (DeeModel *self);
208
209 static void invalidate_peer (DeeSharedModel *self,
210 const gchar *sender_name,
211 GDBusConnection *except);
212
213 static gboolean on_invalidate (DeeSharedModel *self);
214
215
216 /* Create a new revision. The revision will own @row */
217 static DeeSharedModelRevision*
218 dee_shared_model_revision_new (ChangeType type,
219 guint32 pos,
220 guint64 seqnum,
221 GVariant **row,
222 DeeModel *model)
223 {
224 DeeSharedModelRevision *rev;
225
226 g_return_val_if_fail (type != CHANGE_TYPE_REMOVE &&
227 type != CHANGE_TYPE_CLEAR ? row != NULL : TRUE, NULL);
228
229 // FIXME: Use g_slice instead og g_new
230 rev = g_slice_new (DeeSharedModelRevision);
231 rev->change_type = (guchar) type;
232 rev->pos = pos;
233 rev->seqnum = seqnum;
234 rev->row = row;
235 rev->model = model;
236
237 return rev;
238 }
239
240 /* Free all resources owned by a revision, and the revision itself */
241 static void
242 dee_shared_model_revision_free (DeeSharedModelRevision *rev)
243 {
244 guint n_cols, i;
245 gsize row_slice_size;
246
247 g_return_if_fail (rev != NULL);
248
249 n_cols = dee_model_get_n_columns (rev->model);
250 row_slice_size = n_cols * sizeof(gpointer);
251
252 for (i = 0; i < n_cols && rev->row != NULL; i++)
253 g_variant_unref (rev->row[i]);
254
255 g_slice_free1 (row_slice_size, rev->row);
256 g_slice_free (DeeSharedModelRevision, rev);
257 }
258
259 static gboolean
260 flush_revision_queue_timeout_cb (DeeModel *self)
261 {
262 DeeSharedModelPrivate *priv;
263 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
264 priv = DEE_SHARED_MODEL (self)->priv;
265
266 priv->revision_queue_timeout_id = 0;
267 flush_revision_queue (self);
268
269 return FALSE;
270 }
271
272 /* Emit all queued revisions in one signal on the bus.
273 * Clears the revision_queue_timeout if there is one set.
274 * Returns the number of flushed revisions */
275 static guint
276 flush_revision_queue (DeeModel *self)
277 {
278 DeeSharedModelPrivate *priv;
279 DeeSharedModelRevision *rev;
280 GError *error;
281 GSList *iter;
282 GSList *connection_iter;
283 GVariant *schema;
284 GVariant *transaction_variant;
285 GVariantBuilder aav, au, ay, transaction;
286 guint64 seqnum_begin = 0, seqnum_end = 0;
287 guint n_cols, i;
288
289 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), 0);
290 priv = DEE_SHARED_MODEL (self)->priv;
291
292 /* If we are not connected yet, this should be a no-op.
293 * There are two cases to consider:
294 * 1) We are building a model before we are even connected.
295 * This only makes sense if we are sure to become leaders,
296 * we'll assume the programmer knows this
297 * 2) We are resetting the model - no problem
298 */
299 if (priv->connections == NULL)
300 {
301 trace_object (self, "Flushing revision queue, without a connection. "
302 "This will blow up unless you are the leader model");
303 g_slist_foreach (priv->revision_queue,
304 (GFunc) dee_shared_model_revision_free,
305 NULL);
306 g_slist_free (priv->revision_queue);
307 priv->revision_queue = NULL;
308 }
309
310 /* Clear the current timeout if we have one running */
311 if (priv->revision_queue_timeout_id != 0)
312 {
313 g_source_remove (priv->revision_queue_timeout_id);
314 priv->revision_queue_timeout_id = 0;
315 }
316
317 /* If we don't have anything queued up, just return. It's assumed beyond
318 * this point that it is non-empty */
319 if (priv->revision_queue == NULL)
320 {
321 priv->last_committed_seqnum = dee_serializable_model_get_seqnum (self);
322 return 0;
323 }
324
325 /* Since we always prepend to the queue we need to reverse it */
326 priv->revision_queue = g_slist_reverse (priv->revision_queue);
327
328 n_cols = dee_model_get_n_columns (self);
329
330 /* We know that the revision_queue is non-empty at this point. We peek the
331 * first element and assume that the last seqnum before this transaction
332 * started was the seqnum in the first revision - 1. */
333 seqnum_end = ((DeeSharedModelRevision *) priv->revision_queue->data)->seqnum - 1;
334 seqnum_begin = priv->last_committed_seqnum;
335
336 g_variant_builder_init (&aav, G_VARIANT_TYPE ("aav"));
337 g_variant_builder_init (&au, G_VARIANT_TYPE ("au"));
338 g_variant_builder_init (&ay, G_VARIANT_TYPE ("ay"));
339 for (iter = priv->revision_queue; iter; iter = iter->next)
340 {
341 gboolean is_remove;
342 rev = (DeeSharedModelRevision*) iter->data;
343 is_remove = rev->change_type == CHANGE_TYPE_REMOVE ||
344 rev->change_type == CHANGE_TYPE_CLEAR;
345
346 /* Sanity check our seqnums */
347 if (rev->seqnum != seqnum_end + 1)
348 {
349 g_critical ("Internal accounting error of DeeSharedModel@%p. Seqnums "
350 "not sequential: "
351 "%"G_GUINT64_FORMAT" != %"G_GUINT64_FORMAT" + 1",
352 self, rev->seqnum, seqnum_end);
353 return 0;
354 }
355 seqnum_end = rev->seqnum;
356
At conditional: "rev->row == NULL" taking True branch"
At conditional: "is_remove != (rev->row == NULL)" taking True branch"
CID 10036 - FORWARD_NULL
Comparing "rev->row" to null implies that "rev->row" might be null.
357 if ((is_remove) != (rev->row == NULL))
358 {
359 g_critical ("Internal accounting error is DeeSharedModel@%p. "
360 "Transaction row payload must be empty iff the change"
361 "type is is a removal", self);
362 }
363
364 /* Build the variants for this change */
365 g_variant_builder_open (&aav, G_VARIANT_TYPE ("av"));
At conditional: "i < n_cols" taking True branch"
At conditional: "!is_remove" taking True branch"
366 for (i = 0; i < n_cols && !is_remove; i++)
367 {
CID 10036 - FORWARD_NULL
Dereferencing null variable "rev->row".
368 g_variant_builder_add_value (&aav,
369 g_variant_new_variant (rev->row[i]));
370 }
371 g_variant_builder_close (&aav);
372 g_variant_builder_add (&au, "u", rev->pos);
373 g_variant_builder_add (&ay, "y", (guchar) rev->change_type);
374
375 /* Free the revisions while we are traversing the linked list anyway */
376 dee_shared_model_revision_free (rev);
377 }
378
379 /* Collect the schema */
380 schema = g_variant_new_strv (dee_model_get_schema(self, NULL), -1);
381
382 /* Build the Commit message */
383 g_variant_builder_init (&transaction, G_VARIANT_TYPE ("(sasaavauay(tt))"));
384 g_variant_builder_add (&transaction, "s", dee_peer_get_swarm_name (priv->swarm));
385 g_variant_builder_add_value (&transaction, schema);
386 g_variant_builder_add_value (&transaction, g_variant_builder_end (&aav));
387 g_variant_builder_add_value (&transaction, g_variant_builder_end (&au));
388 g_variant_builder_add_value (&transaction, g_variant_builder_end (&ay));
389 g_variant_builder_add_value (&transaction,
390 g_variant_new ("(tt)", seqnum_begin, seqnum_end));
391
392 transaction_variant = g_variant_builder_end (&transaction);
393
394 /* Throw a Commit signal */
395 for (connection_iter = priv->connections; connection_iter != NULL;
396 connection_iter = connection_iter->next)
397 {
398 error = NULL;
399 g_dbus_connection_emit_signal((GDBusConnection*) connection_iter->data,
400 NULL,
401 priv->model_path,
402 "com.canonical.Dee.Model",
403 "Commit",
404 transaction_variant,
405 &error);
406
407 if (error != NULL)
408 {
409 g_critical ("Failed to emit DBus signal "
410 "com.canonical.Dee.Model.Commit: %s", error->message);
411 g_error_free (error);
412 }
413 }
414
415 trace_object (self, "Flushed %"G_GUINT64_FORMAT" revisions. "
416 "Seqnum range %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT,
417 seqnum_end - seqnum_begin, seqnum_begin, seqnum_end);
418
419 /* Free and reset the queue. Note that we freed the individual revisions while
420 * we constructed the Commit message */
421 g_slist_free (priv->revision_queue);
422 priv->revision_queue = NULL;
423
424 priv->last_committed_seqnum = seqnum_end;
425
426 return seqnum_end - seqnum_begin; // Very theoretical overflow possible here...
427 }
428
429 /* Prepare a revision to be emitted as a signal on the bus. The revisions
430 * are queued up so that we can emit them in batches. Steals the ref on the
431 * row array and assumes the refs on the variants as well */
432 static void
433 enqueue_revision (DeeModel *self,
434 ChangeType type,
435 guint32 pos,
436 guint64 seqnum,
437 GVariant **row)
438 {
439 DeeSharedModelPrivate *priv;
440 DeeSharedModelRevision *rev;
441
442 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
443 priv = DEE_SHARED_MODEL (self)->priv;
444
445 rev = dee_shared_model_revision_new (type, pos, seqnum, row, self);
446
447 priv->revision_queue = g_slist_prepend (priv->revision_queue, rev);
448
449 /* Flush the revision queue once in idle */
450 if (priv->revision_queue_timeout_id == 0)
451 {
452 priv->revision_queue_timeout_id =
453 g_idle_add ((GSourceFunc)flush_revision_queue_timeout_cb, self);
454 }
455 }
456
457 /* GObject stuff */
458 static void
459 dee_shared_model_finalize (GObject *object)
460 {
461 guint i;
462 DeeSharedModelPrivate *priv = DEE_SHARED_MODEL (object)->priv;
463
464 /* Flush any pending revisions */
465 if (priv->revision_queue != NULL)
466 {
467 flush_revision_queue (DEE_MODEL(object));
468 priv->revision_queue = NULL;
469 }
470
471 if (priv->acquisition_timer_id != 0)
472 {
473 g_source_remove (priv->acquisition_timer_id);
474 priv->acquisition_timer_id = 0;
475 }
476
477 if (priv->connection_acquired_handler)
478 {
479 g_signal_handler_disconnect (priv->swarm,
480 priv->connection_acquired_handler);
481 priv->connection_acquired_handler = 0;
482 }
483
484 if (priv->connection_closed_handler)
485 {
486 g_signal_handler_disconnect (priv->swarm, priv->connection_closed_handler);
487 priv->connection_closed_handler = 0;
488 }
489
490 if (priv->connection_infos != NULL)
491 {
492 for (i = 0; i < priv->connection_infos->len; i++)
493 {
494 DeeConnectionInfo *info;
495 info = &g_array_index (priv->connection_infos, DeeConnectionInfo, i);
496 g_dbus_connection_unregister_object (info->connection,
497 info->registration_id);
498 g_dbus_connection_signal_unsubscribe (info->connection,
499 info->signal_subscription_id);
500 }
501
502 g_array_unref (priv->connection_infos);
503 priv->connection_infos = NULL;
504 }
505 if (priv->swarm_leader_handler != 0)
506 {
507 g_signal_handler_disconnect (priv->swarm, priv->swarm_leader_handler);
508 priv->swarm_leader_handler = 0;
509 }
510 if (priv->model_path)
511 {
512 g_free (priv->model_path);
513 }
514 if (priv->connections)
515 {
516 g_slist_free (priv->connections);
517 priv->connections = NULL;
518 }
519 if (priv->swarm)
520 {
521 g_object_unref (priv->swarm);
522 priv->swarm = NULL;
523 }
524
525 G_OBJECT_CLASS (dee_shared_model_parent_class)->finalize (object);
526 }
527
528 static void
529 dee_shared_model_set_property (GObject *object,
530 guint id,
531 const GValue *value,
532 GParamSpec *pspec)
533 {
534 DeeSharedModelPrivate *priv;
535
536 priv = DEE_SHARED_MODEL (object)->priv;
537
538 switch (id)
539 {
540 case PROP_PEER:
541 if (priv->swarm != NULL)
542 g_object_unref (priv->swarm);
543 priv->swarm = g_value_dup_object (value);
544 break;
545 case PROP_SYNCHRONIZED:
546 g_critical ("Trying to set read only property DeeSharedModel:synchronized");
547 break;
548 default:
549 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
550 break;
551 }
552 }
553
554 static void
555 dee_shared_model_get_property (GObject *object,
556 guint id,
557 GValue *value,
558 GParamSpec *pspec)
559 {
560 DeeSharedModelPrivate *priv;
561
562 priv = DEE_SHARED_MODEL (object)->priv;
563
564 switch (id)
565 {
566 case PROP_PEER:
567 g_value_set_object (value, priv->swarm);
568 break;
569 case PROP_SYNCHRONIZED:
570 g_value_set_boolean (value, priv->synchronized);
571 break;
572 default:
573 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, id, pspec);
574 break;
575 }
576 }
577
578 static gboolean
579 iterate_connections (DeeSharedModel *self)
580 {
581 DeeSharedModelPrivate *priv;
582 GSList *iter;
583
584 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
585 priv = self->priv;
586
587 priv->connections = dee_peer_get_connections (priv->swarm);
588
589 for (iter = priv->connections; iter != NULL; iter = iter->next)
590 {
591 on_connection_acquired (self, (GDBusConnection*) iter->data, priv->swarm);
592 }
593
594 priv->acquisition_timer_id = 0;
595
596 return FALSE;
597 }
598
599 static void
600 dee_shared_model_constructed (GObject *object)
601 {
602 DeeSharedModel *self;
603 DeeSharedModelPrivate *priv;
604 gchar *dummy;
605
606 /* GObjectClass has NULL 'constructed' member, but we add this check for
607 * future robustness if we ever move to another base class */
608 if (G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed != NULL)
609 G_OBJECT_CLASS (dee_shared_model_parent_class)->constructed (object);
610
611 self = DEE_SHARED_MODEL (object);
612 priv = self->priv;
613
614 if (priv->swarm == NULL)
615 {
616 g_critical ("You must create a DeeSharedModel with a DeePeer "
617 "in the 'peer' property");
618 return;
619 }
620
621 /* Create a canonical object path from the well known swarm name */
622 dummy = g_strdup (dee_peer_get_swarm_name (priv->swarm));
623 priv->model_path = g_strconcat ("/com/canonical/dee/model/",
624 g_strdelimit (dummy, ".", '/'),
625 NULL);
626 g_free (dummy);
627
628 priv->swarm_leader_handler =
629 g_signal_connect_swapped (priv->swarm, "notify::swarm-leader",
630 G_CALLBACK (on_leader_changed), self);
631
632 priv->connection_acquired_handler =
633 g_signal_connect_swapped (priv->swarm, "connection-acquired",
634 G_CALLBACK (on_connection_acquired), self);
635
636 priv->connection_closed_handler =
637 g_signal_connect_swapped (priv->swarm, "connection-closed",
638 G_CALLBACK (on_connection_closed), self);
639
640 /* we don't want to invoke on_connection_acquired from here, it would mean
641 * emitting important signal when inside g_object_new */
642 /* using G_PRIORITY_DEFAULT will ensure that this will be dispatched before
643 * we'll have a chance to acquire new connections... FIXME: right? */
644 priv->acquisition_timer_id = g_idle_add_full (G_PRIORITY_DEFAULT,
645 (GSourceFunc) iterate_connections, self, NULL);
646 }
647
648 static void
649 dee_shared_model_class_init (DeeSharedModelClass *klass)
650 {
651 GParamSpec *pspec;
652 GObjectClass *obj_class = G_OBJECT_CLASS (klass);
653
654 obj_class->finalize = dee_shared_model_finalize;
655 obj_class->set_property = dee_shared_model_set_property;
656 obj_class->get_property = dee_shared_model_get_property;
657 obj_class->constructed = dee_shared_model_constructed;
658
659 /**
660 * DeeSharedModel:peer:
661 *
662 * The #DeePeer that this model uses to connect to the swarm
663 */
664 pspec = g_param_spec_object ("peer", "Peer",
665 "The peer object that monitors the swarm",
666 DEE_TYPE_PEER,
667 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY);
668 g_object_class_install_property (obj_class, PROP_PEER, pspec);
669
670 /**
671 * DeeSharedModel:synchronized:
672 *
673 * Boolean property defining whether or not the model has synchronized with
674 * its peers (if any) yet.
675 *
676 * You should not modify a #DeeSharedModel that is not synchronized. Before
677 * modifying the model in any way (except calling dee_model_set_schema())
678 * you should wait for it to become synchronized.
679 */
680 pspec = g_param_spec_boolean("synchronized", "Synchronized",
681 "Whether the model is synchronized with its peers",
682 FALSE,
683 G_PARAM_READABLE);
684 g_object_class_install_property (obj_class, PROP_SYNCHRONIZED, pspec);
685
686 /**
687 * DeeSharedModel::begin-transaction:
688 * @model: The shared model the signal is emitted on
689 * @begin_seqnum: The seqnum the model has now
690 * @end_seqnum: The seqnum the model will have after the transaction is applied
691 *
692 * Emitted right before a remote transaction will be committed to the model.
693 */
694 _signals[BEGIN_TRANSACTION] =
695 g_signal_new ("begin-transaction",
696 DEE_TYPE_SHARED_MODEL,
697 G_SIGNAL_RUN_LAST,
698 0,
699 NULL, NULL,
700 _dee_marshal_VOID__UINT64_UINT64,
701 G_TYPE_NONE, 2,
702 G_TYPE_UINT64, G_TYPE_UINT64);
703
704 /**
705 * DeeSharedModel::end-transaction:
706 * @model: The shared model the signal is emitted on
707 * @begin_seqnum: The seqnum the model had before the transaction was applied
708 * @end_seqnum: The seqnum the model has now
709 *
710 * Emitted right after a remote transaction has been committed to the model.
711 */
712 _signals[END_TRANSACTION] =
713 g_signal_new ("end-transaction",
714 DEE_TYPE_SHARED_MODEL,
715 G_SIGNAL_RUN_LAST,
716 0,
717 NULL, NULL,
718 _dee_marshal_VOID__UINT64_UINT64,
719 G_TYPE_NONE, 2,
720 G_TYPE_UINT64, G_TYPE_UINT64);
721
722 /* Add private data */
723 g_type_class_add_private (obj_class, sizeof (DeeSharedModelPrivate));
724 }
725
726 static void
727 dee_shared_model_init (DeeSharedModel *self)
728 {
729 DeeSharedModelPrivate *priv;
730
731 priv = self->priv = DEE_SHARED_MODEL_GET_PRIVATE (self);
732
733 priv->swarm = NULL;
734 priv->model_path = NULL;
735
736 priv->last_committed_seqnum = 0;
737 priv->revision_queue = NULL;
738 priv->revision_queue_timeout_id = 0;
739 priv->swarm_leader_handler = 0;
740
741 priv->synchronized = FALSE;
742 priv->found_first_peer = FALSE;
743 priv->suppress_remote_signals = FALSE;
744
745 if (!dee_shared_model_error_quark)
746 dee_shared_model_error_quark = g_quark_from_string ("dbus-model-error");
747
748 priv->connections = NULL;
749 priv->connection_infos = g_array_new (FALSE, TRUE, sizeof (DeeConnectionInfo));
750
751 /* Connect to our own signals so we can queue up revisions to be emitted
752 * on the bus */
753 g_signal_connect (self, "row-added", G_CALLBACK (on_self_row_added), NULL);
754 g_signal_connect (self, "row-removed", G_CALLBACK (on_self_row_removed), NULL);
755 g_signal_connect (self, "row-changed", G_CALLBACK (on_self_row_changed), NULL);
756 }
757
758 static void
759 handle_dbus_method_call (GDBusConnection *connection,
760 const gchar *sender,
761 const gchar *object_path,
762 const gchar *interface_name,
763 const gchar *method_name,
764 GVariant *parameters,
765 GDBusMethodInvocation *invocation,
766 gpointer user_data)
767 {
768 GVariant *retval;
769
770 g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
771
772 if (g_strcmp0 ("Clone", method_name) == 0)
773 {
774 /* If we have anything in the rev queue it wont validate against the
775 * seqnum for the cloned model. So flush the rev queue before answering
776 * the Clone call */
777 flush_revision_queue (DEE_MODEL (user_data));
778
779 /* We return a special error if we have no schema. It's legal for the
780 * leader to expect the schema from the slaves */
781 if (dee_model_get_n_columns (DEE_MODEL (user_data)) == 0)
782 {
783 g_dbus_method_invocation_return_dbus_error (invocation,
784 "com.canonical.Dee.Model.NoSchemaError",
785 "No schema defined");
786 }
787 else
788 {
789 // FIXME: It can be expensive to build the clone. Perhaps thread this?
790 retval = dee_serializable_serialize(DEE_SERIALIZABLE (user_data));
791 g_dbus_method_invocation_return_value (invocation, retval);
792 }
793 }
794 else if (g_strcmp0 ("Invalidate", method_name) == 0)
795 {
796 on_invalidate (DEE_SHARED_MODEL (user_data));
797 g_dbus_method_invocation_return_value (invocation, NULL);
798 }
799 else
800 {
801 g_warning ("Unknown DBus method call %s.%s from %s on DeeSharedModel",
802 interface_name, method_name, sender);
803 }
804 }
805
806 static const GDBusInterfaceVTable model_interface_vtable =
807 {
808 handle_dbus_method_call,
809 NULL,
810 NULL
811 };
812
813 static void
814 on_connection_acquired (DeeSharedModel *self,
815 GDBusConnection *connection,
816 DeePeer *peer)
817 {
818 DeeSharedModelPrivate *priv;
819 DeeConnectionInfo connection_info;
820 GDBusNodeInfo *model_introspection_data;
821 guint dbus_signal_handler;
822 guint model_registration_id;
823
824 /* Keep the parsed introspection data of the Model interface around */
825 static GDBusInterfaceInfo *model_interface_info = NULL;
826
827 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
828
829 priv = self->priv;
830
831 if (connection == NULL)
832 {
833 g_warning ("Internal error in DeeSharedModel. %s called with NULL "
834 "connection", __func__);
835 return;
836 }
837
838 /* Update our list of connections */
839 if (priv->connections) g_slist_free (priv->connections);
840 priv->connections = dee_peer_get_connections (priv->swarm);
841
842 /* Listen for changes from the peers in the same swarm.
843 * We do this by matching arg0 with the swarm name */
844 dbus_signal_handler = g_dbus_connection_signal_subscribe (
845 connection,
846 NULL, // sender
847 "com.canonical.Dee.Model", // iface
848 NULL, // member
849 NULL, // object path
850 dee_peer_get_swarm_name (priv->swarm), // arg0
851 G_DBUS_SIGNAL_FLAGS_NONE,
852 on_dbus_signal_received,
853 self, // user data
854 NULL); // user data destroy
855
856 /* Load com.canonical.Dee.Model introspection XML on first run */
857 if (model_interface_info == NULL)
858 {
859 model_introspection_data = g_dbus_node_info_new_for_xml (
860 com_canonical_Dee_Model_xml, NULL);
861 model_interface_info = g_dbus_node_info_lookup_interface (
862 model_introspection_data,
863 "com.canonical.Dee.Model");
864
865 g_dbus_interface_info_ref (model_interface_info);
866 g_dbus_node_info_unref (model_introspection_data);
867 }
868
869 /* Export the model on the bus */
870 model_registration_id =
871 g_dbus_connection_register_object (connection,
872 priv->model_path, /* object path */
873 model_interface_info,
874 &model_interface_vtable,
875 self, /* user_data */
876 NULL, /* user_data_free_func */
877 NULL); /* GError** */
878
879 connection_info.connection = connection;
880 connection_info.signal_subscription_id = dbus_signal_handler;
881 connection_info.registration_id = model_registration_id;
882 g_array_append_val (priv->connection_infos, connection_info);
883
884 /* If we are swarm leaders and we have column type info we are ready by now.
885 * Otherwise we will be ready when we receive the model clone from the leader
886 */
887 if (dee_peer_is_swarm_leader (priv->swarm))
888 {
889 if (dee_model_get_n_columns (DEE_MODEL (self)) > 0 && !priv->synchronized)
890 {
891 priv->synchronized = TRUE;
892 g_object_notify (G_OBJECT (self), "synchronized");
893 }
894 }
895 else if (dee_peer_get_swarm_leader (priv->swarm) != NULL)
896 {
897 /* There is a leader and it's not us.
898 * Start cloning the model of the leader */
899
900 clone_leader (self);
901 }
902 else
903 {
904 // FIXME: There's no known leader
905 }
906 }
907
908 static void
909 on_connection_closed (DeeSharedModel *self,
910 GDBusConnection *connection,
911 DeePeer *peer)
912 {
913 DeeSharedModelPrivate *priv;
914 guint i;
915
916 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
917
918 priv = self->priv;
919
920 /* Update our list of connections */
921 if (priv->connections) g_slist_free (priv->connections);
922 priv->connections = dee_peer_get_connections (priv->swarm);
923
924 /* Disconnect signals etc */
925 for (i = 0; i < priv->connection_infos->len; i++)
926 {
927 DeeConnectionInfo *info;
928 info = &g_array_index (priv->connection_infos, DeeConnectionInfo, i);
929 if (info->connection == connection)
930 {
931 g_dbus_connection_unregister_object (info->connection,
932 info->registration_id);
933 g_dbus_connection_signal_unsubscribe (info->connection,
934 info->signal_subscription_id);
935 /* remove the item */
936 g_array_remove_index (priv->connection_infos, i);
937 break;
938 }
939 }
940 }
941
942 /* Callback for clone_leader() */
943 static void
944 on_clone_received (GObject *source_object,
945 GAsyncResult *res,
946 gpointer user_data)
947 {
948 DeeSharedModel *self;
949 DeeSharedModelPrivate *priv;
950 GVariant *transaction;
951 GError *error;
952 gchar *dbus_error;
953
954 g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
955
956 self = DEE_SHARED_MODEL (user_data);
957 priv = self->priv;
958
959 error = NULL;
960 transaction = g_dbus_connection_call_finish (G_DBUS_CONNECTION (source_object),
961 res, &error);
962
963 if (error != NULL)
964 {
965 dbus_error = g_dbus_error_get_remote_error (error);
966 if (g_strcmp0 (dbus_error, "com.canonical.Dee.Model.NoSchemaError") == 0)
967 {
968 trace_object (self, "Got Clone reply from leader, but leader has no schema");
969 g_free (dbus_error);
970 }
971 else
972 {
973 g_critical ("Failed to clone model from leader: %s", error->message);
974 g_error_free (error);
975 g_free (dbus_error);
976 return;
977 }
978 }
979
980 /* The transaction will be NULL if we received a com.canonical.Dee.Model.NoSchemaError,
981 * but in that case we should still consider our selves synchronized */
982 if (transaction != NULL)
983 {
984 /* Guard against a race where we might inadvertedly have accepted a Commit
985 * before receiving the initial Clone */
986 if (dee_model_get_n_columns (DEE_MODEL (self)) > 0)
987 reset_model (DEE_MODEL (self));
988
989 /* We use the swarm name as sender_name here, because DBus passes us the
990 * unique name of the swarm leader here and we want to indicate in the debug
991 * messages that the transaction came from the leader */
992 commit_transaction (self,
993 dee_shared_model_get_swarm_name (self),
994 transaction);
995 g_variant_unref (transaction);
996 }
997
998 /* If we where invalidated before, we should be fine now */
999 if (!priv->synchronized)
1000 {
1001 priv->synchronized = TRUE;
1002 g_object_notify (G_OBJECT (self), "synchronized");
1003 }
1004
1005 g_object_unref (self); // we grabbed a self ref during async call
1006 }
1007
1008 /* Send a Clone message to the swarm leader */
1009 static void
1010 clone_leader (DeeSharedModel *self)
1011 {
1012 DeeSharedModelPrivate *priv;
1013 GSList *iter;
1014
1015 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
1016 g_return_if_fail (dee_peer_get_swarm_leader (self->priv->swarm) != NULL);
1017 g_return_if_fail (self->priv->revision_queue == NULL);
1018 g_return_if_fail (dee_model_get_n_rows (DEE_MODEL (self)) == 0);
1019
1020 priv = self->priv;
1021
1022 trace_object (self, "Cloning leader '%s'",
1023 dee_shared_model_get_swarm_name (self));
1024
1025 /* This shouldn't really happen when we have multiple connections, but let's
1026 * have it here for consistency */
1027 for (iter = priv->connections; iter != NULL; iter = iter->next)
1028 {
1029 g_dbus_connection_call((GDBusConnection*) iter->data,
1030 dee_shared_model_get_swarm_name (self), // name
1031 priv->model_path, // obj path
1032 "com.canonical.Dee.Model", // iface
1033 "Clone", // member
1034 NULL, // args
1035 G_VARIANT_TYPE ("(sasaavauay(tt))"), // ret type
1036 G_DBUS_CALL_FLAGS_NONE,
1037 -1, // timeout
1038 NULL, // cancel
1039 on_clone_received, // cb
1040 g_object_ref (self)); // userdata
1041 }
1042 }
1043
1044 static void
1045 on_dbus_signal_received (GDBusConnection *connection,
1046 const gchar *sender_name,
1047 const gchar *object_path,
1048 const gchar *interface_name,
1049 const gchar *signal_name,
1050 GVariant *parameters,
1051 gpointer user_data)
1052 {
1053 DeeSharedModel *model;
1054 const gchar *unique_name;
1055
1056 g_return_if_fail (DEE_IS_SHARED_MODEL (user_data));
1057
1058 unique_name = g_dbus_connection_get_unique_name (connection);
1059
1060 trace_object (user_data, "%s: sender: %s, our unique_name: %s",
1061 __func__, sender_name, unique_name);
1062
1063 /* Ignore signals from our selves. We may get those because of the way
1064 * we set up the match rules */
1065 if (unique_name != NULL && g_strcmp0 (sender_name,
1066 g_dbus_connection_get_unique_name (connection)) == 0)
1067 return;
1068
1069 if (g_strcmp0 (signal_name, "Commit") == 0)
1070 {
1071 model = DEE_SHARED_MODEL (user_data);
1072 commit_transaction (model, sender_name, parameters);
1073
1074 if (g_slist_length (model->priv->connections) > 1)
1075 {
1076 /* this is a server and a client (non-leader) just committed a change
1077 * to the model, let's invalidate all other clients */
1078 invalidate_peer (model, sender_name, connection);
1079 }
1080 }
1081 else
1082 g_warning ("Unexpected signal %s.%s from %s",
1083 interface_name, signal_name, sender_name);
1084 }
1085
1086
1087 static void
1088 on_leader_changed (DeeSharedModel *self)
1089 {
1090 DeeSharedModelPrivate *priv;
1091
1092 priv = self->priv;
1093
1094 if (dee_shared_model_is_leader (self))
1095 {
1096 /* The leader is the authoritative data source so if we are not
1097 * synchronized we will now be by very definition */
1098 if (!priv->synchronized)
1099 {
1100 priv->synchronized = TRUE;
1101 g_object_notify (G_OBJECT (self), "synchronized");
1102 }
1103 }
1104 else
1105 {
1106 if (!priv->synchronized)
1107 {
1108 clone_leader (self);
1109 }
1110 }
1111 }
1112
1113 static void
1114 commit_transaction (DeeSharedModel *self,
1115 const gchar *sender_name,
1116 GVariant *transaction)
1117 {
1118 DeeSharedModelPrivate *priv;
1119 GVariantIter iter;
1120 GVariant *schema, *row, **row_buf, *val, *aav, *au, *ay, *tt;
1121 const gchar **column_schemas;
1122 gsize column_schemas_len;
1123 gchar *swarm_name;
1124 guint64 seqnum_before, seqnum_after, current_seqnum;
1125 guint64 n_rows, n_cols, model_n_rows;
1126 guint32 pos;
1127 guchar change_type;
1128 gint i, j;
1129
1130 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
1131 g_return_if_fail (transaction != NULL);
1132
1133 g_variant_ref_sink (transaction);
1134
1135 priv = self->priv;
1136 g_variant_iter_init (&iter, transaction);
1137
1138 /* The transaction should have signature '(sasaavauay(tt)'.
1139 * Make sure it at least looks right */
1140 if (g_strcmp0 (g_variant_get_type_string (transaction), "(sasaavauay(tt))") != 0)
1141 {
1142 g_critical ("Unexpected format for Commit message '%s' from %s. "
1143 "Expected '(sasaavauay(tt))'",
1144 g_variant_get_type_string (transaction), sender_name);
1145 g_variant_unref (transaction);
1146 return;
1147 }
1148
1149 /* Assert that this is a Commit for the right swarm name */
1150 g_variant_iter_next (&iter, "s", &swarm_name);
1151 if (g_strcmp0 (swarm_name, dee_peer_get_swarm_name (priv->swarm)) != 0)
1152 {
1153 g_critical ("Error in internal message routing. "
1154 "Unexpected swarm name '%s' on Commit from %s."
1155 "Expected '%s'",
1156 swarm_name, sender_name,
1157 dee_peer_get_swarm_name (priv->swarm));
1158 g_variant_unref (transaction);
1159 return;
1160 }
1161
1162 /* If the model has no schema then use the one received in the transaction */
1163 schema = g_variant_iter_next_value (&iter);
1164 n_cols = dee_model_get_n_columns (DEE_MODEL (self));
1165 if (n_cols == 0)
1166 {
1167 column_schemas = g_variant_get_strv (schema, &column_schemas_len);
1168 if (column_schemas != NULL)
1169 {
1170 n_cols = column_schemas_len;
1171 dee_model_set_schema_full (DEE_MODEL(self), column_schemas, n_cols);
1172 g_free (column_schemas);
1173 }
1174 else
1175 {
1176 g_warning ("Received transaction before the model schema has been set"
1177 " and none received from leader");
1178 g_variant_unref (transaction);
1179 g_variant_unref (schema);
1180 return;
1181 }
1182 }
1183 g_variant_unref (schema);
1184
1185 /* Parse the rest of the transaction */
1186 aav = g_variant_iter_next_value (&iter);
1187 au = g_variant_iter_next_value (&iter);
1188 ay = g_variant_iter_next_value (&iter);
1189 tt = g_variant_iter_next_value (&iter);
1190
1191 /* Validate that the seqnums are as we expect */
1192 g_variant_get (tt, "(tt)", &seqnum_before, &seqnum_after);
1193 g_variant_unref (tt);
1194
1195 /* If this is our first transaction we accept anything, if not the
1196 * incoming seqnums must align with our own records */
1197 current_seqnum = dee_serializable_model_get_seqnum (DEE_MODEL (self));
1198 if (current_seqnum != 0 &&
1199 current_seqnum != seqnum_before)
1200 {
1201 g_warning ("Transaction from %s is in the %s. Expected seqnum %"G_GUINT64_FORMAT
1202 ", but got %"G_GUINT64_FORMAT". Ignoring transaction.",
1203 sender_name,
1204 current_seqnum < seqnum_before ? "future" : "past",
1205 current_seqnum, seqnum_before);
1206 if (dee_shared_model_is_leader (self))
1207 {
1208 g_warning ("Invalidating %s", sender_name);
1209 invalidate_peer (self, sender_name, NULL);
1210 }
1211
1212 g_variant_unref (transaction);
1213 g_variant_unref (aav);
1214 g_variant_unref (au);
1215 g_variant_unref (ay);
1216 return;
1217 }
1218
1219 /* Check that the lengths of all the arrays match up */
1220 n_rows = g_variant_n_children (aav);
1221 if (n_rows != g_variant_n_children (au))
1222 {
1223 g_warning ("Commit from %s has illegal position vector",
1224 sender_name);
1225 // FIXME cleanup
1226 }
1227 if (n_rows != g_variant_n_children (ay))
1228 {
1229 g_warning ("Commit from %s has illegal change type vector",
1230 sender_name);
1231 // FIXME cleanup
1232 }
1233 if (n_rows > (seqnum_after - seqnum_before))
1234 {
1235 g_warning ("Commit from %s has illegal seqnum count.",
1236 sender_name);
1237 }
1238
1239 /* Allocate an array on the stack as a temporary row data buffer */
1240 row_buf = g_alloca (n_cols * sizeof (gpointer));
1241
1242 trace_object (self, "Applying transaction of %i rows", n_rows);
1243
1244 /* Phew. Finally. We're ready to merge the changes */
1245 g_signal_emit (self, _signals[BEGIN_TRANSACTION], 0, seqnum_before, seqnum_after);
1246 priv->suppress_remote_signals = TRUE;
1247 for (i = 0; i < n_rows; i++) /* Begin outer loop */
1248 {
1249 model_n_rows = dee_model_get_n_rows (DEE_MODEL (self));
1250
1251 g_variant_get_child (au, i, "u", &pos);
1252 g_variant_get_child (ay, i, "y", &change_type);
1253
1254 /* Before parsing the row data we check if it's a remove,
1255 * because in that case we might as well not parse the
1256 * row data at all */
1257 if (change_type == CHANGE_TYPE_REMOVE)
1258 {
1259 dee_model_remove (DEE_MODEL (self),
1260 dee_model_get_iter_at_row (DEE_MODEL (self), pos));
1261 model_n_rows--;
1262 continue;
1263 }
1264
1265 if (change_type == CHANGE_TYPE_CLEAR)
1266 {
1267 dee_model_clear (DEE_MODEL (self));
1268 model_n_rows = 0;
1269 continue;
1270 }
1271
1272 /* It's an Add or Change so parse the row data */
1273 row = g_variant_get_child_value (aav, i);
1274
1275 /* Add and Change rows must have the correct number of columns */
1276 if (g_variant_n_children (row) != n_cols)
1277 {
1278 g_critical ("Commit from %s contains rows of illegal length. "
1279 "The model may have been left in a dirty state",
1280 sender_name);
1281 // FIXME: cleanup
1282 return;
1283 }
1284
1285 /* Read the row cells into our stack allocated row buffer.
1286 * Note that g_variant_get_child_value() returns a strong ref,
1287 * not a floating one */
1288 for (j = 0; j < n_cols; j++)
1289 {
1290 val = g_variant_get_child_value (row, j); // val is now a 'v'
1291 row_buf[j] = g_variant_get_child_value (val, 0); // unbox the 'v'
1292 g_variant_unref (val);
1293 }
1294
1295 if (change_type == CHANGE_TYPE_ADD)
1296 {
1297 if (pos == 0)
1298 dee_model_prepend_row (DEE_MODEL (self), row_buf);
1299 else if (pos >= model_n_rows)
1300 dee_model_append_row (DEE_MODEL (self), row_buf);
1301 else if (pos < model_n_rows)
1302 dee_model_insert_row (DEE_MODEL (self), pos, row_buf);
1303
1304 }
1305 else if (change_type == CHANGE_TYPE_CHANGE)
1306 {
1307 dee_model_set_row (DEE_MODEL (self),
1308 dee_model_get_iter_at_row (DEE_MODEL (self), pos),
1309 row_buf);
1310 }
1311 else
1312 {
1313 g_critical ("Unknown change type %i from %s. The model may have "
1314 "been left in a dirty state", change_type, sender_name);
1315 // FIXME: continue looping or bail out?
1316 }
1317
1318 /* Free the variants in the row_buf. */
1319 for (j = 0; j < n_cols; j++)
1320 g_variant_unref (row_buf[j]);
1321
1322 g_variant_unref (row);
1323 } /* End outer loop */
1324 priv->suppress_remote_signals = FALSE;
1325
1326 g_variant_unref (transaction);
1327 g_variant_unref (aav);
1328 g_variant_unref (au);
1329 g_variant_unref (ay);
1330
1331 /* We must manually override the seqnum in case we started off from
1332 * zero our selves, but the transaction was a later snapshot */
1333 dee_serializable_model_set_seqnum (DEE_MODEL (self), seqnum_after);
1334
1335 priv->last_committed_seqnum = seqnum_after;
1336
1337 g_signal_emit (self, _signals[END_TRANSACTION], 0, seqnum_before, seqnum_after);
1338 }
1339
1340 static void
1341 on_self_row_added (DeeModel *self, DeeModelIter *iter)
1342 {
1343 DeeSharedModelPrivate *priv;
1344 gsize row_slice_size;
1345 guint32 pos;
1346 GVariant **row;
1347
1348 priv = DEE_SHARED_MODEL (self)->priv;
1349
1350 if (!priv->suppress_remote_signals)
1351 {
1352 row_slice_size = dee_model_get_n_columns(self) * sizeof (gpointer);
1353 row = g_slice_alloc (row_slice_size);
1354
1355 pos = dee_model_get_position (self, iter);
1356 enqueue_revision (self,
1357 CHANGE_TYPE_ADD,
1358 pos,
1359 dee_serializable_model_get_seqnum (self),
1360 dee_model_get_row (self, iter, row));
1361 }
1362 }
1363
1364 static void
1365 on_self_row_removed (DeeModel *self, DeeModelIter *iter)
1366 {
1367 DeeSharedModelPrivate *priv;
1368 guint32 pos;
1369
1370 priv = DEE_SHARED_MODEL (self)->priv;
1371
1372 if (!priv->suppress_remote_signals)
1373 {
1374 pos = dee_model_get_position (self, iter);
1375 enqueue_revision (self,
1376 CHANGE_TYPE_REMOVE,
1377 pos,
1378 dee_serializable_model_get_seqnum (self),
1379 NULL);
1380 }
1381 }
1382
1383 static void
1384 on_self_row_changed (DeeModel *self, DeeModelIter *iter)
1385 {
1386 DeeSharedModelPrivate *priv;
1387 guint32 pos;
1388 gsize row_slice_size;
1389 GVariant **row;
1390
1391 priv = DEE_SHARED_MODEL (self)->priv;
1392
1393 if (!priv->suppress_remote_signals)
1394 {
1395 row_slice_size = dee_model_get_n_columns(self) * sizeof (gpointer);
1396 row = g_slice_alloc (row_slice_size);
1397
1398 pos = dee_model_get_position (self, iter);
1399 enqueue_revision (self,
1400 CHANGE_TYPE_CHANGE,
1401 pos,
1402 dee_serializable_model_get_seqnum (self),
1403 dee_model_get_row (self, iter, row));
1404 }
1405 }
1406
1407 /* Clears all data in the model and resets it to start from scratch */
1408 static void
1409 reset_model (DeeModel *self)
1410 {
1411 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
1412
1413 /* Make sure we don't have any buffered signals awaiting emission */
1414 flush_revision_queue (self);
1415
1416 /* Emit 'removed' on all rows and free old row data */
1417 dee_model_clear (self);
1418
1419 dee_serializable_model_set_seqnum (self, 0);
1420 }
1421
1422 /* Call DBus method com.canonical.Dee.Model.Invalidate() on @sender_name */
1423 static void
1424 invalidate_peer (DeeSharedModel *self,
1425 const gchar *sender_name,
1426 GDBusConnection *except)
1427 {
1428 DeeSharedModelPrivate *priv;
1429 GSList *iter;
1430
1431 g_return_if_fail (DEE_IS_SHARED_MODEL (self));
1432
1433 if (!dee_shared_model_is_leader (self))
1434 {
1435 g_critical ("Internal error in DeeSharedModel. "
1436 "Non-leader model tried to invalidate a peer");
1437 return;
1438 }
1439
1440 priv = self->priv;
1441
1442 // invalidate peers on all connections
1443 for (iter = priv->connections; iter != NULL; iter = iter->next)
1444 {
1445 if (iter->data == except) continue;
1446 g_dbus_connection_call ((GDBusConnection*) iter->data,
1447 sender_name,
1448 priv->model_path,
1449 "com.canonical.Dee.Model",
1450 "Invalidate",
1451 NULL, /* params */
1452 NULL, /* reply type */
1453 G_DBUS_CALL_FLAGS_NONE,
1454 -1, /* timeout */
1455 NULL, /* cancel */
1456 NULL, /* cb */
1457 NULL); /* user data */
1458 }
1459 }
1460
1461 /* Public Methods */
1462
1463 /**
1464 * dee_shared_model_new:
1465 * @name: A well known name to publish this model under. Models sharing this name
1466 * will synchronize with each other
1467 *
1468 * Create a new empty shared model without any column schema associated.
1469 * The column schema will be set in one of two ways: firstly you may set it
1470 * manually with dee_model_set_schema() or secondly it will be set once
1471 * the first rows are exchanged with a peer model.
1472 *
1473 * A #DeeSharedModel with a schema manually set has to be created before
1474 * creating more #DeeSharedModel with the same @name.
1475 *
1476 * A shared model created with this constructor will store row data in a
1477 * suitably picked memory backed model.
1478 *
1479 * Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
1480 */
1481 DeeModel*
1482 dee_shared_model_new (const gchar *name)
1483 {
1484 DeeModel *self;
1485
1486 g_return_val_if_fail (name != NULL, NULL);
1487
1488 self = dee_shared_model_new_with_back_end(name,
1489 dee_sequence_model_new ());
1490
1491 return self;
1492 }
1493
1494 /**
1495 * dee_shared_model_new_for_peer:
1496 * @peer: (transfer full): A #DeePeer instance.
1497 *
1498 * Create a new empty shared model without any column schema associated.
1499 * The column schema will be set in one of two ways: firstly you may set it
1500 * manually with dee_model_set_schema() or secondly it will be set once
1501 * the first rows are exchanged with a peer model.
1502 *
1503 * A #DeeSharedModel with a schema manually set has to be created before
1504 * creating more #DeeSharedModel with the same @name.
1505 *
1506 * A shared model created with this constructor will store row data in a
1507 * suitably picked memory backed model.
1508 *
1509 * Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
1510 */
1511 DeeModel*
1512 dee_shared_model_new_for_peer (DeePeer *peer)
1513 {
1514 DeeModel *self;
1515 DeeModel *back_end;
1516
1517 g_return_val_if_fail (peer != NULL, NULL);
1518
1519 back_end = (DeeModel*) dee_sequence_model_new ();
1520
1521 self = g_object_new (DEE_TYPE_SHARED_MODEL,
1522 "back-end", back_end,
1523 "peer", peer,
1524 NULL);
1525
1526 g_object_unref (back_end);
1527 g_object_unref (peer);
1528
1529 return self;
1530 }
1531
1532 /**
1533 * dee_shared_model_new_with_back_end:
1534 * @name: (transfer none): A well known name to publish this model under.
1535 * Models sharing this name will synchronize with each other
1536 * @back_end: (transfer full): The #DeeModel that will actually store
1537 * the model data. Ownership of the ref to @back_end is transfered to
1538 * the shared model.
1539 *
1540 * Create a new shared model storing all data in @back_end.
1541 *
1542 * The model will start synchronizing with peer models as soon as possible and
1543 * the #DeeSharedModel:synchronized property will be set once finished.
1544 *
1545 * Return value: (transfer full) (type DeeSharedModel): a new #DeeSharedModel
1546 */
1547 DeeModel*
1548 dee_shared_model_new_with_back_end (const gchar *name, DeeModel *back_end)
1549 {
1550 DeeModel *self;
1551 DeePeer *swarm;
1552
1553 g_return_val_if_fail (name != NULL, NULL);
1554
1555 swarm = g_object_new (DEE_TYPE_PEER,
1556 "swarm-name", name,
1557 NULL);
1558
1559 self = g_object_new (DEE_TYPE_SHARED_MODEL,
1560 "back-end", back_end,
1561 "peer", swarm,
1562 NULL);
1563
1564 g_object_unref (back_end);
1565 g_object_unref (swarm);
1566
1567 return self;
1568 }
1569
1570 /**
1571 * dee_shared_model_get_swarm_name:
1572 * @self: The model to get the name for
1573 *
1574 * Convenience function for accessing the #DeePeer:swarm-name property of the
1575 * #DeePeer defined in the #DeeSharedModel:peer property.
1576 *
1577 * Returns: The name of the swarm this model synchrnonizes with
1578 */
1579 const gchar*
1580 dee_shared_model_get_swarm_name (DeeSharedModel *self)
1581 {
1582 DeeSharedModelPrivate *priv;
1583
1584 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), NULL);
1585
1586 priv = self->priv;
1587 return dee_peer_get_swarm_name (priv->swarm);
1588 }
1589
1590 /**
1591 * dee_shared_model_get_peer:
1592 * @self: The model to get the #DeePeer for
1593 *
1594 * Convenience function for accessing the #DeeSharedModel:peer property
1595 *
1596 * Returns: (transfer none): The #DeePeer used to interact with the peer models
1597 */
1598 DeePeer*
1599 dee_shared_model_get_peer (DeeSharedModel *self)
1600 {
1601 DeeSharedModelPrivate *priv;
1602
1603 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), NULL);
1604
1605 priv = self->priv;
1606 return priv->swarm;
1607 }
1608
1609 /**
1610 * dee_shared_model_is_leader:
1611 * @self: The model to inspect
1612 *
1613 * Check if the model is the swarm leader. This is a convenience function for
1614 * accessing the #DeeSharedModel:peer property and checking if it's the swarm
1615 * leader.
1616 *
1617 * Returns: The value of dee_peer_is_swarm_leader() for the #DeePeer used by
1618 * this shared model
1619 */
1620 gboolean
1621 dee_shared_model_is_leader (DeeSharedModel *self)
1622 {
1623 DeeSharedModelPrivate *priv;
1624
1625 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
1626
1627 priv = self->priv;
1628 return dee_peer_is_swarm_leader (priv->swarm);
1629 }
1630
1631 /**
1632 * dee_shared_model_is_synchronized:
1633 * @self: The model to inspect
1634 *
1635 * Check if the model is synchronized with its peers. Before modifying a
1636 * shared model in any way (except dee_model_set_schema()) you should wait for
1637 * it to become synchronized. This is normally done by waiting for the
1638 * "notify::synchronized" signal.
1639 *
1640 * This method is purely a convenience function for accessing the
1641 * #DeeSharedModel:synchronized property.
1642 *
1643 * Returns: The value of the :synchronized property
1644 */
1645 gboolean
1646 dee_shared_model_is_synchronized (DeeSharedModel *self)
1647 {
1648 DeeSharedModelPrivate *priv;
1649
1650 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
1651
1652 priv = self->priv;
1653 return priv->synchronized;
1654 }
1655
1656 /**
1657 * dee_shared_model_flush_revision_queue:
1658 * @self: The shared model to flush the revision queue on
1659 *
1660 * Expert: All changes to @self that has not yet been propagated to the peer
1661 * models are send. This will block the mainloop until all the underlying
1662 * transport streams has been flushed.
1663 *
1664 * Normally #DeeSharedModel collects changes to @self into batches and sends
1665 * them automatically to all peers. You can use this call to provide fine
1666 * grained control of exactly when changes to @self are synchronized to its
1667 * peers. This may for example be useful to improve the interactivity of your
1668 * application if you have a model-process which intermix small and light
1669 * changes with big and expensive changes. Using this call you can make sure
1670 * the model-process dispatches small changes more aggresively to the
1671 * view-process, while holding on to the expensive changes a bit longer.
1672 *
1673 * <emphasis>Important</emphasis>: This method <emphasis>may</emphasis> flush
1674 * your internal queue of DBus messages forcing them to be send before this call
1675 * returns.
1676 *
1677 * Return value: The number of revisions flushed.
1678 */
1679 guint
1680 dee_shared_model_flush_revision_queue (DeeSharedModel *self)
1681 {
1682 DeeSharedModelPrivate *priv;
1683 GError *error;
1684 GSList *iter;
1685 guint n_revisions;
1686
1687 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), 0);
1688
1689 priv = self->priv;
1690 n_revisions = flush_revision_queue (DEE_MODEL (self));
1691
1692 for (iter = priv->connections; iter != NULL; iter = iter->next)
1693 {
1694 error = NULL;
1695 g_dbus_connection_flush_sync ((GDBusConnection*) iter->data, NULL, &error);
1696 if (error)
1697 {
1698 g_critical ("Error when flushing %u revisions of %s@%p: %s",
1699 n_revisions, G_OBJECT_TYPE_NAME (self), self,
1700 error->message);
1701 g_error_free (error);
1702 return 0;
1703 }
1704 }
1705
1706 return n_revisions;
1707 }
1708
1709 static void
1710 dee_shared_model_clear (DeeModel *model)
1711 {
1712 DeeSharedModel *self;
1713 DeeSharedModelPrivate *priv;
1714 DeeModel *backend;
1715 gboolean was_suppressing;
1716 guint64 seqnum;
1717 guint n_rows;
1718
1719 self = DEE_SHARED_MODEL (model);
1720 priv = self->priv;
1721
1722 g_object_get (self, "back-end", &backend, NULL);
1723
1724 was_suppressing = priv->suppress_remote_signals;
1725 seqnum = dee_serializable_model_get_seqnum (model);
1726 n_rows = dee_model_get_n_rows (model);
1727
1728 if (!was_suppressing && n_rows > 0)
1729 {
1730 seqnum += n_rows;
1731 enqueue_revision (model,
1732 CHANGE_TYPE_CLEAR,
1733 0,
1734 seqnum,
1735 NULL);
1736 }
1737 /* make sure we don't enqueue lots of CHANGE_TYPE_REMOVE */
1738 priv->suppress_remote_signals = TRUE;
1739
1740 /* Chain up to parent class impl. This handles the seqnums for us and the
1741 * backend alike. We just hook in before it, really, to player clever
1742 * tricks with the revision queue (inserting a CLEAR and not N*REMOVE) */
1743 ((DeeModelIface*) g_type_interface_peek_parent (DEE_MODEL_GET_IFACE(model)))->clear (model);
1744
1745 priv->suppress_remote_signals = was_suppressing;
1746
1747 g_object_unref (backend);
1748 }
1749
1750 /*
1751 * Dbus Methods
1752 */
1753
1754
1755 /* Build a '(sasaavauay(tt))' suitable for sending in a Clone response */
1756 static GVariant*
1757 dee_shared_model_serialize (DeeSerializable *self)
1758 {
1759 DeeModel *_self;
1760 GVariantBuilder aav, au, ay, clone;
1761 GVariant *val, *tt, *schema;
1762 DeeModelIter *iter;
1763 guint i, j, n_columns;
1764 guint64 last_seqnum;
1765
1766 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
1767
1768 _self = DEE_MODEL (self);
1769 n_columns = dee_model_get_n_columns (DEE_MODEL (self));
1770
1771 g_variant_builder_init (&aav, G_VARIANT_TYPE ("aav"));
1772 g_variant_builder_init (&au, G_VARIANT_TYPE ("au"));
1773 g_variant_builder_init (&ay, G_VARIANT_TYPE ("ay"));
1774
1775 /* Clone the rows */
1776 i = 0;
1777 iter = dee_model_get_first_iter (_self);
1778 while (!dee_model_is_last (_self, iter))
1779 {
1780 g_variant_builder_open (&aav, G_VARIANT_TYPE ("av"));
1781 for (j = 0; j < n_columns; j++)
1782 {
1783 val = dee_model_get_value (_self, iter, j);
1784 g_variant_builder_add_value (&aav, g_variant_new_variant (val));
1785 g_variant_unref (val);
1786 }
1787 g_variant_builder_close (&aav);
1788
1789 g_variant_builder_add (&au, "u", i);
1790 g_variant_builder_add (&ay, "y", (guchar) CHANGE_TYPE_ADD);
1791
1792 iter = dee_model_next (_self, iter);
1793 i++;
1794 }
1795
1796 /* Collect the schema */
1797 schema = g_variant_new_strv (dee_model_get_schema(_self, NULL), -1);
1798
1799 /* Collect the seqnums */
1800 last_seqnum = dee_serializable_model_get_seqnum (_self);
1801 tt = g_variant_new ("(tt)", last_seqnum - i, last_seqnum);// FIXME last_committed_seqnum
1802
1803 /* Build the final clone */
1804 g_variant_builder_init (&clone, G_VARIANT_TYPE ("(sasaavauay(tt))"));
1805 g_variant_builder_add (&clone, "s", dee_shared_model_get_swarm_name (DEE_SHARED_MODEL (self)));
1806 g_variant_builder_add_value (&clone, schema);
1807 g_variant_builder_add_value (&clone, g_variant_builder_end (&aav));
1808 g_variant_builder_add_value (&clone, g_variant_builder_end (&au));
1809 g_variant_builder_add_value (&clone, g_variant_builder_end (&ay));
1810 g_variant_builder_add_value (&clone, tt);
1811
1812 trace_object (self, "Serialized %u rows. "
1813 "Seqnum range %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT,
1814 i, last_seqnum - i, last_seqnum);
1815
1816 return g_variant_builder_end (&clone);
1817 }
1818
1819 /* Handle an incoming Invalidate() message */
1820 static gboolean
1821 on_invalidate (DeeSharedModel *self)
1822 {
1823 DeeSharedModelPrivate *priv;
1824
1825 g_return_val_if_fail (DEE_IS_SHARED_MODEL (self), FALSE);
1826
1827 priv = self->priv;
1828
1829 if (dee_peer_is_swarm_leader (priv->swarm))
1830 {
1831 g_warning ("Refusing to invalidate swarm leader");
1832 return FALSE;
1833 }
1834
1835 trace_object (self, "Model invalidated");
1836
1837 priv->synchronized = FALSE;
1838 priv->suppress_remote_signals = TRUE;
1839 reset_model (DEE_MODEL (self));
1840 clone_leader (self);
1841 priv->suppress_remote_signals = FALSE;
1842
1843 return TRUE;
1844 }
1845
1846 static GObject*
1847 dee_shared_model_parse_serialized (GVariant *data)
1848 {
1849 DeeSharedModel *self;
1850 gchar *swarm_name;
1851
1852 g_variant_get_child (data, 0, "s", &swarm_name);
1853
1854 self = DEE_SHARED_MODEL (dee_shared_model_new (swarm_name));
1855 commit_transaction (self, swarm_name, data);
1856
1857 g_free (swarm_name);
1858
1859 return (GObject *) self;
1860 }
1861
1862 static void
1863 dee_shared_model_serializable_iface_init (DeeSerializableIface *iface)
1864 {
1865 iface->serialize = dee_shared_model_serialize;
1866
1867 dee_serializable_register_parser (DEE_TYPE_SHARED_MODEL,
1868 G_VARIANT_TYPE ("(sasaavauay(tt))"),
1869 dee_shared_model_parse_serialized);
1870 }
1871
1872 static void
1873 dee_shared_model_model_iface_init (DeeModelIface *iface)
1874 {
1875 DeeModelIface *proxy_model_iface;
1876
1877 proxy_model_iface = (DeeModelIface*) g_type_interface_peek_parent (iface);
1878
1879 /* we just need to override clear, but gobject is making this difficult */
1880 iface->set_schema_full = proxy_model_iface->set_schema_full;
1881 iface->get_schema = proxy_model_iface->get_schema;
1882 iface->get_column_schema = proxy_model_iface->get_column_schema;
1883 iface->get_n_columns = proxy_model_iface->get_n_columns;
1884 iface->get_n_rows = proxy_model_iface->get_n_rows;
1885 iface->prepend_row = proxy_model_iface->prepend_row;
1886 iface->append_row = proxy_model_iface->append_row;
1887 iface->insert_row = proxy_model_iface->insert_row;
1888 iface->insert_row_before = proxy_model_iface->insert_row_before;
1889 iface->remove = proxy_model_iface->remove;
1890 iface->set_value = proxy_model_iface->set_value;
1891 iface->set_row = proxy_model_iface->set_row;
1892 iface->get_value = proxy_model_iface->get_value;
1893 iface->get_first_iter = proxy_model_iface->get_first_iter;
1894 iface->get_last_iter = proxy_model_iface->get_last_iter;
1895 iface->get_iter_at_row = proxy_model_iface->get_iter_at_row;
1896 iface->get_bool = proxy_model_iface->get_bool;
1897 iface->get_uchar = proxy_model_iface->get_uchar;
1898 iface->get_int32 = proxy_model_iface->get_int32;
1899 iface->get_uint32 = proxy_model_iface->get_uint32;
1900 iface->get_int64 = proxy_model_iface->get_int64;
1901 iface->get_uint64 = proxy_model_iface->get_uint64;
1902 iface->get_double = proxy_model_iface->get_double;
1903 iface->get_string = proxy_model_iface->get_string;
1904 iface->next = proxy_model_iface->next;
1905 iface->prev = proxy_model_iface->prev;
1906 iface->is_first = proxy_model_iface->is_first;
1907 iface->is_last = proxy_model_iface->is_last;
1908 iface->get_position = proxy_model_iface->get_position;
1909 iface->register_tag = proxy_model_iface->register_tag;
1910 iface->get_tag = proxy_model_iface->get_tag;
1911 iface->set_tag = proxy_model_iface->set_tag;
1912
1913 iface->clear = dee_shared_model_clear;
1914 }