From 0b85740bc98047f0acebf263343ce3218a878b98 Mon Sep 17 00:00:00 2001 From: Joop Kiefte Date: Sat, 11 Dec 2021 03:18:04 +0000 Subject: Add P2P receive capability --- message/message.go | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 4 deletions(-) (limited to 'message/message.go') diff --git a/message/message.go b/message/message.go index aa2a3e5..6d8ddcd 100644 --- a/message/message.go +++ b/message/message.go @@ -5,7 +5,9 @@ import ( "crypto/sha256" "encoding/json" "fmt" + "io" "math/bits" + "sync" "time" ipfs "github.com/ipfs/go-ipfs-api" @@ -90,21 +92,82 @@ func New(msg string, n int, timestamp int64) *Message { } // Map Messages maps the stamp of the message to the message itself -type Messages map[string]*Message +type Messages struct { + lock sync.RWMutex + msgs map[string]*Message +} + +// MessagesFromIPFS takes a CID and returns a Messages map +func MessagesFromIPFS(cid string) (*Messages, error) { + emptyMsgs := Messages{msgs: make(map[string]*Message)} + // Create an IPFS instance based on the IPFSGateway + myIPFS := ipfs.NewShell(IPFSGateway) + // Get the JSON from IPFS + jsonr, err := myIPFS.Cat(cid) + if err != nil { + return &emptyMsgs, err + } + jsonb, err := io.ReadAll(jsonr) + if err != nil { + return &emptyMsgs, err + } + // Unmarshal the JSON into a Messages map + var messages Messages + err = json.Unmarshal(jsonb, &(messages.msgs)) + if err != nil { + return &emptyMsgs, err + } + return &messages, nil +} // Add a message to the Messages map func (m *Messages) Add(msg *Message) { - (*m)[msg.Stamp()] = msg + m.lock.Lock() + defer m.lock.Unlock() + if m.msgs == nil { + m.msgs = make(map[string]*Message) + } + m.msgs[msg.Stamp()] = msg +} + +// AddMany adds another Messages map to the current Messages map +func (m *Messages) AddMany(msgs *Messages) { + m.lock.Lock() + defer m.lock.Unlock() + msgs.lock.RLock() + defer msgs.lock.RUnlock() + if m.msgs == nil { + m.msgs = make(map[string]*Message) + } + for _, msg := range msgs.msgs { + m.msgs[msg.Stamp()] = msg + } } // Remove a message from the Messages map by stamp func (m *Messages) Remove(stamp string) { - delete(*m, stamp) + m.lock.Lock() + defer m.lock.Unlock() + delete(m.msgs, stamp) } // Return a JSON representation of the Messages map func (m *Messages) JSON() ([]byte, error) { - return json.Marshal(m) + m.lock.RLock() + defer m.lock.RUnlock() + return json.Marshal(m.msgs) +} + +// Do something with each message in the Messages map +func (m *Messages) Each(f func(msg *Message)) { + m.lock.RLock() + defer m.lock.RUnlock() + if m.msgs == nil { + return + } + for _, msg := range m.msgs { + f(msg) + } } // Add the messages as a JSON object to IPFS -- cgit v1.2.3-70-g09d2