처리 도관 1 개, 동일한 유형의 IO 소스 2 개
GHC Haskell
stm, network-conduit 및 conduit을 사용하는 내 응용 프로그램에서 .NET을 사용하여 자동으로 분기되는 각 소켓에 대한 가닥이 runTCPServer
있습니다. 스트랜드는 방송 TChan을 사용하여 다른 스트랜드와 통신 할 수 있습니다.
이것은 내가 도관 "체인"을 설정하는 방법을 보여줍니다.
그래서 우리가 여기에있는 것은 (각각 헬퍼 도관에 묶인) 두 개의 소스입니다. Packet
이것은 encoder
수용하고로 변할 객체를 생성 한 ByteString
다음 소켓을 보냅니다. 두 입력을 효율적으로 융합하는 데 (성능이 문제가 됨) 많은 어려움을 겪었습니다.
누군가 나를 올바른 방향으로 안내해 주시면 감사하겠습니다.
시도하지 않고이 질문을 게시하는 것은 무례 할 수 있으므로 이전에 시도한 내용을 여기에 넣겠습니다.
TMChan (닫을 수있는 채널)에서 소스를 생성 (차단)하는 함수를 작성 / 체리 선택했습니다.
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
마찬가지로 Chan을 싱크로 변환하는 기능;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
그런 다음 mergeSources는 간단합니다. fork 2 스레드 (내가 정말로하고 싶지 않지만 도대체 무엇인가)는 새로운 항목을 하나의 목록에 넣을 수 있으며 소스를 생성합니다.
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
이 함수들을 타입 체크하는 데 성공했지만, 타입 체크를 위해이 기능들을 활용하는 데는 실패했습니다.
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
이 방법은 어쨌든 결함이 있다고 생각합니다. 중간 목록과 변환이 많이 있습니다. 이것은 성능에 좋지 않습니다. 지침 구하기.
PS. From what I can understand, this is not a duplicate of; Fusing conduits with multiple inputs , as in my situation both sources produce the same type and I don't care from which source the Packet
object is produced, as long as I'm not waiting on one while another has objects ready to be consumed.
PPS. I apologize for the usage (and therefore requirement of knowledge) of Lens in example code.
I don't know if it's any help, but I tried to implement Iain's suggestion and made a variant of mergeSources'
that stops as soon as any of the channels does:
mergeSources' :: (MonadIO m, MonadBaseControl IO m)
=> [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
c <- liftSTM $ newTBMChan bound
mapM_ (\s -> resourceForkIO $
s $$ chanSink c writeTBMChan closeTBMChan) sx
return $ sourceTBMChan c
(This simple addition is available here).
Some comments to your version of mergeSources
(take them with a grain of salt, it can be I didn't understand something well):
- Using
...TMChan
instead of...TBMChan
seems dangerous. If the writers are faster than the reader, your heap will blow. Looking at your diagram it seems that this can easily happen, if your TCP peer doesn't read data fast enough. So I'd definitely use...TBMChan
with perhaps large but limited bound. You don't need the
MonadSTM m
constraint. All STM stuff is wrapped intoIO
withliftSTM = liftIO . atomically
Maybe this will help you slightly when using
mergeSources'
inserverApp
.Just a cosmetic issue, I found
liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
very hard to read due to its use of
liftA2
on the(->) r
monad. I'd saydo c <- liftSTM newTMChan fsrc sx c retn c
would be longer, but much easier to read.
Could you perhaps create a self-contained project where it would be possible to play with serverApp
?
참고 URL : https://stackoverflow.com/questions/16757060/one-processing-conduit-2-io-sources-of-the-same-type
'IT TIP' 카테고리의 다른 글
디자인 제안 : llvm 다중 런타임 컨텍스트 (0) | 2020.11.28 |
---|---|
iCloud 및 GEOResourceManifestServerRemoteProxy : geod 경고에 대한 연결 끊김 (0) | 2020.11.28 |
ASP.NET MVC 3에서 사용자 지정 편집기 / 디스플레이 템플릿을 만드는 방법은 무엇입니까? (0) | 2020.11.28 |
동적 핀 보크 프로파일 링 (0) | 2020.11.28 |
Xcode가 다시 시작될 때마다 잘못 배치 된 뷰 (0) | 2020.11.28 |